qy4

perl parallel requests curl || poe

Nov 30, 2008 18:22

Посмотрел в LWP::Parallel не подходит, т.к. у него что-странное с таймаутами,
пришлось поковырять WWW::Curl.

Короче, код мне кажется он как-то так должен выглядеть, пока без курловых callback'ов:

#!/usr/bin/perl -w use strict; sub succreq { my ($res_ref, $curl) = @_; warn length($$res_ref); } sub failreq { warn 'failed'; } my $curler = Curler->new(); $curler->add('http://www.ru', [], \&succreq, \&failreq); $curler->add('http://ya.ru', [], \&succreq, \&failreq); $curler->add('http://zzzz.zz',[], \&succreq, \&failreq); $curler->do_requests(0.150); ################################ ################################ ################################ package Curler; use strict; use WWW::Curl::Easy; use WWW::Curl::Multi; use Time::HiRes; sub new { my ($proto, %args) = @_; my $class = ref($proto) || $proto; my $this = {}; bless($this, $class); if ($this->init(%args)) { return $this; } else { return undef; } } sub _set { my ($this, $nam, $val) = @_; my $vars = $this->{'_VARS'}; unless (defined($nam)) { warn "Name isn't specified"; return undef; } $vars->{$nam} = $val; return 1; } sub _get { my ($this, $nam) = @_; return $this->{'_VARS'}->{$nam}; } sub init { my $this = shift; $this->{'_VARS'} = { 'active_handles' => 0, 'instances' => {}, # [OBJ, DATA] 'curlm' => undef, }; my $curlm = WWW::Curl::Multi->new(); unless ($curlm) { warn "WWW::Curl::Multi initialization failed!"; return undef; } $this->_set('curlm', $curlm); return 1; } sub add { my ($this, $url, $headers, $succ_callback, $fail_callback) = @_; # Init the curl session my $curle = WWW::Curl::Easy->new(); unless ($curle) { warn("WWW::Curl::Easy initialization failed!"); return undef; } $curle->setopt(CURLOPT_HTTPHEADER, $headers); $curle->setopt(CURLOPT_URL, $url); # BUG: bug with curl timeouts < 1 s (curl immidiatly return timeout), fixed whith ALRM # dont use CURLOPT_TIMEOUT... # $curl_bm->setopt(CURLOPT_TIMEOUT_MS, 100); my $res; # NOTE - do not use a typeglob here. A reference to a typeglob is okay though. open (my $file, ">", \$res); $curle->setopt(CURLOPT_WRITEDATA, $file); my $p_id = 0; my $instances = $this->_get('instances'); 1 while ($instances->{++$p_id}); $curle->setopt(CURLOPT_PRIVATE, $p_id); # private ID my $curlm = $this->_get('curlm'); $curlm->add_handle($curle); $instances->{$p_id} = { 'curl' => $curle, 'result' => \$res, 'succ_callback' => $succ_callback, 'func' => $fail_callback, }; $this->_set('instances', $instances); return 1; } sub do_requests { my ($this, $timeout) = @_; $timeout ||= 1; my $instances = $this->_get('instances'); my $curlm = $this->_get('curlm'); my @postpr; # Do multiple requests my $t0 = Time::HiRes::time; local $SIG{ALRM} = sub { die '%%cURLTimeout%%'; }; Time::HiRes::alarm($timeout); eval { while (scalar(keys %$instances)) { my $active_transfers = $curlm->perform; if ($active_transfers != scalar(keys %$instances)) { while (my ($p_id, $rv) = $curlm->info_read) { if ($p_id) { my $inst = $instances->{$p_id}; delete $instances->{$p_id}; # do the usual result/error checking routine here unless ($rv == 0) { warn("HTTP request error cURL_id=$p_id err: " . $inst->{'curl'}->strerror($rv) . " ($rv)"); } else { $inst->{'func'} = $inst->{'succ_callback'}; } push @postpr, $inst; } } } Time::HiRes::sleep(0.005); } Time::HiRes::alarm(0); }; Time::HiRes::alarm(0); local $SIG{ALRM} = sub {}; if ($@ && $@ =~ /^%%cURLTimeout%%/) { warn sprintf("Timeout %.3fsec", $timeout); } elsif ($@) { warn("cURL error: " . $@); } foreach my $pp (values(%$instances), @postpr) { if (UNIVERSAL::isa($pp->{'func'}, 'CODE')) { &{$pp->{'func'}}($pp->{'result'}, $pp->{'curl'}); } else { warn 'nocallback'; } } } 1;

perl, http, parallel

Previous post Next post
Up