从熟悉的前题开始。
#! /usr/bin/env perl
use strict;
use warnings;
use 5.10.0; # for // (defined-or)
use IO::Handle;
use IO::Select;
use LWP::Simple;
use POSIX qw/ :sys_wait_h /;
use Socket;
全局常量控制程序执行。
my $DEBUG = 0;
my $EXIT_COMMAND = "<EXIT>";
my $NJOBS = 10;
要检查的 URL 每行到达一个套接字的工作者端。对于每个 URL,worker 调用LWP::Simple::head
以确定资源是否可获取。然后,worker 将格式为url 的一行写回套接字:*status* 其中 *status* 是"YES"
or"NO"
和表示空格字符。
如果 URL 是$EXIT_COMMAND
,则工作人员立即退出。
sub check_sites {
my($s) = @_;
warn "$0: [$$]: waiting for URL" if $DEBUG;
while (<$s>) {
chomp;
warn "$0: [$$]: got '$_'" if $DEBUG;
exit 0 if $_ eq $EXIT_COMMAND;
print $s "$_: ", (head($_) ? "YES" : "NO"), "\n";
}
die "NOTREACHED";
}
要创建一个工作者,我们首先创建一个socketpair
. 父进程将使用一端,每个工人(子)将使用另一端。我们禁用两端的缓冲并将父端添加到我们的 IO::Select 实例。我们还记录了每个孩子的进程 ID,以便我们可以等待所有工作人员完成。
sub create_worker {
my($sel,$kidpid) = @_;
socketpair my $parent, my $kid, AF_UNIX, SOCK_STREAM, PF_UNSPEC
or die "$0: socketpair: $!";
$_->autoflush(1) for $parent, $kid;
my $pid = fork // die "$0: fork: $!";
if ($pid) {
++$kidpid->{$pid};
close $kid or die "$0: close: $!";
$sel->add($parent);
}
else {
close $parent or die "$0: close: $!";
check_sites $kid;
die "NOTREACHED";
}
}
为了分派 URL,父级获取尽可能多的可用阅读器,并从作业队列中分发相同数量的 URL。在作业队列为空后剩余的任何工作人员都会收到退出命令。
请注意,print
如果底层工作人员已经退出,则会失败。父母必须忽略SIGPIPE
以防止立即终止。
sub dispatch_jobs {
my($sel,$jobs) = @_;
foreach my $s ($sel->can_write) {
my $url = @$jobs ? shift @$jobs : $EXIT_COMMAND;
warn "$0 [$$]: sending '$url' to fd ", fileno $s if $DEBUG;
print $s $url, "\n" or $sel->remove($s);
}
}
到时间控制达到read_results
时,工人已经创建并接受了工作。现在父母用来can_read
等待一个或多个工人的结果到达。定义的结果是当前工作人员的回答,未定义的结果意味着孩子已经退出并关闭了套接字的另一端。
sub read_results {
my($sel,$results) = @_;
warn "$0 [$$]: waiting for readers" if $DEBUG;
foreach my $s ($sel->can_read) {
warn "$0: [$$]: reading from fd ", fileno $s if $DEBUG;
if (defined(my $result = <$s>)) {
chomp $result;
push @$results, $result;
warn "$0 [$$]: got '$result' from fd ", fileno $s if $DEBUG;
}
else {
warn "$0 [$$]: eof from fd ", fileno $s if $DEBUG;
$sel->remove($s);
}
}
}
父母必须跟踪现场工作人员以收集所有结果。
sub reap_workers {
my($kidpid) = @_;
while ((my $pid = waitpid -1, WNOHANG) > 0) {
warn "$0: [$$]: reaped $pid" if $DEBUG;
delete $kidpid->{$pid};
}
}
运行池执行上面的 subs 以调度所有 URL 并返回所有结果。
sub run_pool {
my($n,@jobs) = @_;
my $sel = IO::Select->new;
my %kidpid;
my @results;
create_worker $sel, \%kidpid for 1 .. $n;
local $SIG{PIPE} = "IGNORE"; # writes to dead workers will fail
while (@jobs || keys %kidpid || $sel->handles) {
dispatch_jobs $sel, \@jobs;
read_results $sel, \@results;
reap_workers \%kidpid;
}
warn "$0 [$$]: returning @results" if $DEBUG;
@results;
}
使用示例主程序
my @jobs = qw(
bogus
http://stackoverflow.com/
http://www.google.com/
http://www.yahoo.com/
);
my @results = run_pool $NJOBS, @jobs;
print $_, "\n" for @results;
输出是
假的:没有
http://www.google.com/:是的
http://stackoverflow.com/:是的
http://www.yahoo.com/:是的