1

我正在尝试编写一个脚本,该脚本将N通过 HTTP 同时下载大多数文件。

我以前使用AnyEvent::Worker::Pool来管理 BLOCKING 任务池。我还结合使用AnyEvent::HTTPAnyEvent->condvar来单独管理非阻塞下载。

我认为结合这两种方法应该非常简单,以便AnyEvent->condvar使 AnyEvent::HTTP::http_get 从AnyEvent::Worker::Pool的角度来看是 BLOCKING 。

但是,我遇到了一些我不明白的错误,大概是由于AnyEvent::Worker的实现细节。这是演示该问题的脚本的真正精简版本:

use EV;
use AnyEvent 5;
use AnyEvent::Worker::Pool;
use AnyEvent::HTTP;
use 5.10.0;
use strict;

my $pool_size = 2;
my $num_jobs  = 7;

# Create a pool of $pool_size workers
my $workers = AnyEvent::Worker::Pool->new($pool_size, sub {
  my ($job) = @_;
  eval {
    my $cv = AnyEvent->condvar;
    print "worker starting download [$job] ...\n";
    http_get 'http://download.thinkbroadband.com/5MB.zip', sub {
      my ($data, $headers) = @_;
      if ($headers->{Status} =~ /^2/) { 
        print "download [$job] succeeded.\n"; 
      } else { 
        print "download [$job] failed.\n"; 
      }
      $cv->send; # notification of download complete/exit.
    };

    $cv->recv; # wait for download to complete/exit before returning to pool
  }; if ($@) {
    print "worker payload error: $@\n";
  }
  return 1;
});

# dispatch the full list of downloads
my ($need,$done) = ($num_jobs, 0);
for my $job (0 .. ($need - 1)) {
  print "dispatching job $job...\n";
  $workers->do($job, sub {
    print "worker [$job] payload threw exception: $@\n" if $@;
    print "worker [$job] payload completed successfully!\n" unless $@;
    EV::unloop if ++$done == $need;
  });
}

EV::loop; # wait here for all downloads to complete
print "We're done!\n"; # some useful code to follow here...

演示输出如下:

user@host:~$ ./test.pl
dispatching job 0...
dispatching job 1...
dispatching job 2...
dispatching job 3...
dispatching job 4...
dispatching job 5...
dispatching job 6...
worker starting download [0] ...
worker starting download [1] ...
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
worker [6] payload threw exception: no worker connection
EV: error in callback (ignoring): no worker connection at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 60

^C
user@host:~$
user@host:~$
user@host:~$ download [1] failed.
unable to write results: Broken pipe at /usr/local/share/perl/5.14.2/AnyEvent/Worker.pm line 139.
  ...caught at /usr/local/share/perl/5.14.2/AnyEvent/Worker.pm line 145.

为什么选择AnyEvent::HTTP

在我的真实脚本中,我使用了更多的功能AnyEvent::HTTP;特别是,我将on_body回调与Term::StatusBar为脚本的最终用户显示进度条相结合;此外,我在on_body回调中策略性地“暂停”,以使传输速率等于或小于最终用户预定义的速率。

请随时提出具有这些功能的替代方案(或破解它们的简单方法!)

为什么选择AnyEvent::Worker::Pool

我已经很熟悉了。欢迎提供替代建议。

为什么是电动汽车

它很快。同样,欢迎提出替代建议。

4

1 回答 1

2

您不应该为此任务使用 AnyEvent::Worker::Poll。
我建议你不要使用循环特定的功能,如 EV::loop EV::unloop。这使您的代码与其他循环实现不兼容。

您的代码可能会这样重写

use strict;
use AnyEvent;
use AnyEvent::HTTP;

my $pool_size = 2;
my $num_jobs  = 7;
my $cur_job = 0;

my $cv = AnyEvent->condvar;
$cv->begin();

for (1..($pool_size < $num_jobs ? $pool_size : $num_jobs)) {
    $cv->begin();
    make_job($cur_job++);
}

$cv->end();

sub make_job {
    my $job = shift;
    $num_jobs--;

    http_get 'http://download.thinkbroadband.com/5MB.zip', sub {
        my ($data, $headers) = @_;
        if ($headers->{Status} =~ /^2/) { 
            print "download [$job] succeeded.\n"; 
        } else { 
            print "download [$job] failed.\n"; 
        }

        if ($num_jobs > 0) {
            make_job($cur_job++);
        }
        else {
            $cv->end();
        }
    };
}

$cv->recv();
于 2013-12-15T18:23:07.020 回答