我已经读了几次这个问题,我想我有点明白你想要做什么。你有一个控制脚本。这个脚本产生了孩子做一些事情,这些孩子产生了孙子来实际做这项工作。问题是孙子可能太慢(等待 STDIN 或其他),而您想杀死它们。此外,如果有一个缓慢的孙子,您希望整个孩子都死掉(如果可能,杀死其他孙子)。
所以,我尝试了这两种方式。第一个是让父进程在一个新的 UNIX 会话中生成一个子进程,将计时器设置为几秒钟,并在计时器关闭时终止整个子会话。这使得父母对孩子和孙子都负责。它也无法正常工作。
下一个策略是让父母产生孩子,然后让孩子负责管理孙辈。它会为每个孙子设置一个计时器,如果进程在到期时间之前没有退出,则将其终止。这很好用,所以这里是代码。
我们将使用 EV 来管理孩子和计时器,并使用 AnyEvent 来管理 API。(您可以尝试另一个 AnyEvent 事件循环,例如 Event 或 POE。但我知道 EV 在您告诉循环监视它之前正确处理了子退出的条件,这消除了其他循环容易受到的恼人竞争条件。)
#!/usr/bin/env perl
use strict;
use warnings;
use feature ':5.10';
use AnyEvent;
use EV; # you need EV for the best child-handling abilities
我们需要跟踪子观察者:
# active child watchers
my %children;
然后我们需要编写一个函数来启动孩子。父代产生的东西称为子代,子代产生的东西称为作业。
sub start_child($$@) {
my ($on_success, $on_error, @jobs) = @_;
参数是子进程成功完成时要调用的回调(意味着它的作业也成功),子进程未成功完成时的回调,然后是要运行的 coderef 作业列表。
在这个函数中,我们需要分叉。在父级中,我们设置了一个子级观察者来监视子级:
if(my $pid = fork){ # parent
# monitor the child process, inform our callback of error or success
say "$$: Starting child process $pid";
$children{$pid} = AnyEvent->child( pid => $pid, cb => sub {
my ($pid, $status) = @_;
delete $children{$pid};
say "$$: Child $pid exited with status $status";
if($status == 0){
$on_success->($pid);
}
else {
$on_error->($pid);
}
});
}
在孩子身上,我们实际上是在运行这些工作。不过,这涉及到一些设置。
首先,我们忘记了父母的孩子观察者,因为孩子被告知其兄弟姐妹退出是没有意义的。(fork 很有趣,因为你继承了父级的所有状态,即使这根本没有意义。)
else { # child
# kill the inherited child watchers
%children = ();
my %timers;
我们还需要知道所有工作何时完成,以及它们是否都成功了。我们使用计数条件变量来确定一切何时退出。我们在启动时递增,在退出时递减,当计数为 0 时,我们知道一切都已完成。
我还保留一个布尔值来指示错误状态。如果进程以非零状态退出,则错误为 1。否则,它保持为 0。您可能希望保持比这更多的状态 :)
# then start the kids
my $done = AnyEvent->condvar;
my $error = 0;
$done->begin;
(我们也从 1 开始计数,这样如果有 0 个作业,我们的进程仍然会退出。)
现在我们需要为每个作业分叉,并运行该作业。在父级中,我们做一些事情。我们增加 condvar。如果孩子太慢,我们会设置一个计时器来杀死孩子。我们设置了一个子观察者,所以我们可以知道作业的退出状态。
for my $job (@jobs) {
if(my $pid = fork){
say "[c] $$: starting job $job in $pid";
$done->begin;
# this is the timer that will kill the slow children
$timers{$pid} = AnyEvent->timer( after => 3, interval => 0, cb => sub {
delete $timers{$pid};
say "[c] $$: Killing $pid: too slow";
kill 9, $pid;
});
# this monitors the children and cancels the timer if
# it exits soon enough
$children{$pid} = AnyEvent->child( pid => $pid, cb => sub {
my ($pid, $status) = @_;
delete $timers{$pid};
delete $children{$pid};
say "[c] [j] $$: job $pid exited with status $status";
$error ||= ($status != 0);
$done->end;
});
}
使用计时器比警报更容易一点,因为它带有状态。每个计时器都知道要杀死哪个进程,并且当进程成功退出时很容易取消计时器——我们只需将其从哈希中删除即可。
那是(孩子的)父母。(孩子的;或工作的)孩子真的很简单:
else {
# run kid
$job->();
exit 0; # just in case
}
如果您愿意,也可以在此处关闭标准输入。
现在,在所有进程都生成之后,我们通过等待 condvar 来等待它们全部退出。事件循环将监视孩子和计时器,并为我们做正确的事情:
} # this is the end of the for @jobs loop
$done->end;
# block until all children have exited
$done->recv;
然后,当所有孩子都退出时,我们可以做任何我们想做的清理工作,比如:
if($error){
say "[c] $$: One of your children died.";
exit 1;
}
else {
say "[c] $$: All jobs completed successfully.";
exit 0;
}
} # end of "else { # child"
} # end of start_child
好的,这就是孩子和孙子/工作。现在我们只需要编写父级,这要容易得多。
像孩子一样,我们将使用计数 condvar 来等待我们的孩子。
# main program
my $all_done = AnyEvent->condvar;
我们需要做一些工作。这是一个总是成功的,一个如果你按下回车就会成功,但如果你让它被计时器杀死就会失败:
my $good_grandchild = sub {
exit 0;
};
my $bad_grandchild = sub {
my $line = <STDIN>;
exit 0;
};
因此,我们只需要启动子作业。如果你还记得回到顶部的方式start_child
,它需要两个回调,一个错误回调和一个成功回调。我们将设置它们;错误回调将打印“not ok”并减少 condvar,成功回调将打印“ok”并执行相同操作。很简单。
my $ok = sub { $all_done->end; say "$$: $_[0] ok" };
my $nok = sub { $all_done->end; say "$$: $_[0] not ok" };
然后我们可以让一群孩子拥有更多的孙子工作:
say "starting...";
$all_done->begin for 1..4;
start_child $ok, $nok, ($good_grandchild, $good_grandchild, $good_grandchild);
start_child $ok, $nok, ($good_grandchild, $good_grandchild, $bad_grandchild);
start_child $ok, $nok, ($bad_grandchild, $bad_grandchild, $bad_grandchild);
start_child $ok, $nok, ($good_grandchild, $good_grandchild, $good_grandchild, $good_grandchild);
其中两个将超时,两个将成功。但是,如果您在它们运行时按 enter,那么它们可能都会成功。
无论如何,一旦它们开始了,我们只需要等待它们完成:
$all_done->recv;
say "...done";
exit 0;
这就是程序。
我们没有做 Parallel::ForkManager 所做的一件事是“限制”我们的分叉,以便一次只有n
子运行。不过,这很容易手动实现:
use Coro;
use AnyEvent::Subprocess; # better abstraction than manually
# forking and making watchers
use Coro::Semaphore;
my $job = AnyEvent::Subprocess->new(
on_completion => sub {}, # replace later
code => sub { the child process };
)
my $rate_limit = Coro::Semaphore->new(3); # 3 procs at a time
my @coros = map { async {
my $guard = $rate_limit->guard;
$job->clone( on_completion => Coro::rouse_cb )->run($_);
Coro::rouse_wait;
}} ({ args => 'for first job' }, { args => 'for second job' }, ... );
# this waits for all jobs to complete
my @results = map { $_->join } @coros;
这里的好处是你可以在你的孩子运行时做其他事情——只需async
在你做阻塞连接之前产生更多的线程。您还可以通过 AnyEvent::Subprocess 对子进程进行更多控制——您可以在 Pty 中运行子进程并为其提供标准输入(如使用 Expect),您可以捕获其标准输入、标准输出和标准错误,或者您可以忽略那些东西,或者别的什么。你可以决定,而不是一些试图让事情变得“简单”的模块作者。
无论如何,希望这会有所帮助。