3

我正在编写一个 Perl 脚本,它接收一个 URL 列表并检查它们是否存在。(请注意,我只关心它们是否存在;我关心它们的内容是什么。这是程序的重要部分。

use LWP::Simple qw($ua head);

if (head($url))
{
    $numberAlive ++;
}
else
{
    $numberDead ++;
}

现在程序运行良好;但是,我希望它运行得更快。因此,我正在考虑将其设为多线程。我假设我的程序的缓慢部分是为每个 URL 联系服务器;因此,我正在寻找一种在等待第一个响应时可以向列表中其他网页的 URL 发送请求的方法。我怎样才能做到这一点?据我所知,该head例程没有可以在服务器响应后调用的回调。

4

3 回答 3

7

从熟悉的前题开始。

#! /usr/bin/env perl

use strict;
use warnings;

use 5.10.0;  # for // (defined-or)

use IO::Handle;
use IO::Select;
use LWP::Simple;
use POSIX qw/ :sys_wait_h /;
use Socket;

全局常量控制程序执行。

my $DEBUG = 0;
my $EXIT_COMMAND = "<EXIT>";
my $NJOBS = 10;

要检查的 URL 每行到达一个套接字的工作者端。对于每个 URL,worker 调用LWP::Simple::head以确定资源是否可获取。然后,worker 将格式为url 的一行写回套接字*status* 其中 *status* 是"YES"or"NO"表示空格字符。

如果 URL 是$EXIT_COMMAND,则工作人员立即退出。

sub check_sites {
  my($s) = @_;

  warn "$0: [$$]: waiting for URL" if $DEBUG;

  while (<$s>) {
    chomp;
    warn "$0: [$$]: got '$_'" if $DEBUG;
    exit 0 if $_ eq $EXIT_COMMAND;
    print $s "$_: ", (head($_) ? "YES" : "NO"), "\n";
  }

  die "NOTREACHED";
}

要创建一个工作者,我们首先创建一个socketpair. 父进程将使用一端,每个工人(子)将使用另一端。我们禁用两端的缓冲并将父端添加到我们的 IO::Select 实例。我们还记录了每个孩子的进程 ID,以便我们可以等待所有工作人员完成。

sub create_worker {
  my($sel,$kidpid) = @_;

  socketpair my $parent, my $kid, AF_UNIX, SOCK_STREAM, PF_UNSPEC
    or die "$0: socketpair: $!";
  $_->autoflush(1) for $parent, $kid;

  my $pid = fork // die "$0: fork: $!";
  if ($pid) {
    ++$kidpid->{$pid};
    close $kid or die "$0: close: $!";
    $sel->add($parent);
  }
  else {
    close $parent or die "$0: close: $!";
    check_sites $kid;
    die "NOTREACHED";
  }
}

为了分派 URL,父级获取尽可能多的可用阅读器,并从作业队列中分发相同数量的 URL。在作业队列为空后剩余的任何工作人员都会收到退出命令。

请注意,print如果底层工作人员已经退出,则会失败。父母必须忽略SIGPIPE以防止立即终止。

sub dispatch_jobs {
  my($sel,$jobs) = @_;

  foreach my $s ($sel->can_write) {
    my $url = @$jobs ? shift @$jobs : $EXIT_COMMAND;
    warn "$0 [$$]: sending '$url' to fd ", fileno $s if $DEBUG;
    print $s $url, "\n" or $sel->remove($s);
  }
}

到时间控制达到read_results时,工人已经创建并接受了工作。现在父母用来can_read等待一个或多个工人的结果到达。定义的结果是当前工作人员的回答,未定义的结果意味着孩子已经退出并关闭了套接字的另一端。

sub read_results {
  my($sel,$results) = @_;

  warn "$0 [$$]: waiting for readers" if $DEBUG;
  foreach my $s ($sel->can_read) {
    warn "$0: [$$]: reading from fd ", fileno $s if $DEBUG;
    if (defined(my $result = <$s>)) {
      chomp $result;
      push @$results, $result;
      warn "$0 [$$]: got '$result' from fd ", fileno $s if $DEBUG;
    }
    else {
      warn "$0 [$$]: eof from fd ", fileno $s if $DEBUG;
      $sel->remove($s);
    }
  }
}

父母必须跟踪现场工作人员以收集所有结果。

sub reap_workers {
  my($kidpid) = @_;

  while ((my $pid = waitpid -1, WNOHANG) > 0) {
    warn "$0: [$$]: reaped $pid" if $DEBUG;
    delete $kidpid->{$pid};
  }
}

运行池执行上面的 subs 以调度所有 URL 并返回所有结果。

sub run_pool {
  my($n,@jobs) = @_;

  my $sel = IO::Select->new;
  my %kidpid;
  my @results;

  create_worker $sel, \%kidpid for 1 .. $n;

  local $SIG{PIPE} = "IGNORE";  # writes to dead workers will fail

  while (@jobs || keys %kidpid || $sel->handles) {
    dispatch_jobs $sel, \@jobs;

    read_results $sel, \@results;

    reap_workers \%kidpid;
  }

  warn "$0 [$$]: returning @results" if $DEBUG;
  @results;
}

使用示例主程序

my @jobs = qw(
  bogus
  http://stackoverflow.com/
  http://www.google.com/
  http://www.yahoo.com/
);

my @results = run_pool $NJOBS, @jobs;
print $_, "\n" for @results;

输出是

假的:没有
http://www.google.com/:是的
http://stackoverflow.com/:是的
http://www.yahoo.com/:是的
于 2012-07-22T00:55:40.600 回答
3

另一种选择是 HTTP::Async。

#!/usr/bin/perl
use strict;
use warnings;

use HTTP::Request;
use HTTP::Async;

my $numberAlive = 0;
my $numberDead  = 0;
my @urls = ('http://www.perl.com','http://www.example.xyzzy/foo.html');

my $async = HTTP::Async->new;

# you might want to wrap this in a loop to deal with @urls in batches
foreach my $url (@urls){   
  $async->add( HTTP::Request->new( HEAD => $url ) );
  }

while ( my $response = $async->wait_for_next_response ) {
  if ($response->code == 200){$numberAlive ++;}
  else{$numberDead ++;}
  }

print "$numberAlive Alive, $numberDead Dead\n";
于 2012-07-22T07:58:03.813 回答
1

基于 Worker 的并行化(使用您选择的线程或进程):

use strict;
use warnings;
use feature qw( say );
use threads;  # or: use forks;

use LWP::Simple        qw( head );
use Thread::Queue::Any qw( );

use constant NUM_WORKERS => 10;  # Or whatever.

my $req_q  = Thread::Queue::Any->new();
my $resp_q = Thread::Queue::Any->new();

my @workers;
for (1..NUM_WORKERS) {
   push @workers, async {
      while (my $url = $req_q->dequeue()) {
         my $is_alive = head($url) ? 1 : 0;
         $resp_q->enqueue($is_alive);
      }
   };
}

$req_q->enqueue($_) for @urls;

my ($alive, $dead);
for (1..@urls) {
   my $is_alive = $resp_q->dequeue();
   ++( $is_alive ? $alive : $dead );
}

$req_q->enqueue(undef) for @workers;
$_->join for @workers;

say $alive;
say $dead;
于 2012-07-22T06:36:06.387 回答