3

我正在尝试使用 Parallel::ForkManager 来控制一些子进程。我想将同时运行的进程数限制为 10。我总共需要运行 20 个。

我知道我可以在对象声明的第一行将进程限制设置为 10,但我也使用 $pm 对象来运行执行不同操作的子进程(当前函数占用更多内存,因此需要加以限制)。

我目前的代码不起作用,永远不会调用完成时运行,所以剩下的 10 个孩子永远不会被分叉。我不明白为什么会这样——我原以为孩子在退出时仍会调用完成代码,并减少计数,但“if”语句似乎阻止了这一点。有人可以解释为什么会这样吗?

谢谢你的帮助!

# Parallel declarations
my $pm = Parallel::ForkManager->new(30);

$pm->run_on_finish(sub {
    my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_str_ref) = @_; 
    --$active_jobs;
    })

my $total_jobs = 0;
my $active_jobs = 0;
while( $total_jobs < 20) {
    sleep 300 and next if $active_jobs > 10; 

    my $pid = $pm->start and ++$active_p1_jobs and ++$total_p1_jobs and next;

    my $return = module::function(%args);

    $pm->finish(0, { index => $total_jobs, return => $return }); 
    }

print STDERR "Submitted all jobs, now waiting for children to exit.\n";
$pm->wait_all_children();
4

1 回答 1

3

我将把限制为 10 个的工作称为“类型 2”。

这就是我使用 P::FM 的方式:

use strict;
use warnings;

use List::Util            qw( shuffle );
use Parallel::ForkManager qw( );
use POSIX                 qw( WNOHANG );
use Time::HiRes           qw( sleep );

use constant MAX_WORKERS       => 30;
use constant MAX_TYPE2_WORKERS => 10;

sub is_type2_job { $_[0]{type} == 2 }

my @jobs = shuffle(
   ( map { { type => 1, data => $_ } } 0..19 ),
   ( map { { type => 2, data => $_ } } 0..19 ),
);

my $pm = Parallel::ForkManager->new(MAX_WORKERS);

my $type2_count = 0;
$pm->run_on_finish(sub {
   my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $job) = @_;
   --$type2_count if is_type2_job($job);
   print "Finished: $pid, $job->{type}, $job->{data}, $job->{result}\n";
});

my @postponed_jobs;
while (@postponed_jobs || @jobs) {
   my $job;
   if (@postponed_jobs && $type2_count < MAX_TYPE2_WORKERS) {
      $job = shift(@postponed_jobs);
   }
   elsif (@jobs) {
      $job = shift(@jobs);
      if ($type2_count >= MAX_TYPE2_WORKERS && is_type2_job($job)) {
         push @postponed_jobs, $job;
         redo;
      }
   }
   # elsif (@postponed_jobs) {
   #     # Already max type 2 jobs being processed,
   #     # but there are idle workers.
   #     $job = shift(@postponed_jobs);
   # }
   else {
      local $SIG{CHLD} = sub { };
      select(undef, undef, undef, 0.300);
      $pm->wait_one_child(WNOHANG);
      redo;
   }

   ++$type2_count if is_type2_job($job);

   my $pid = $pm->start and next;
   $job->{result} = $job->{data} + 100;  # Or whatever.
   $pm->finish(0, $job);
}

$pm->wait_all_children();

但这被打破了。选择下一项工作的代码应该在中间完成start(即在它等待孩子完成之后,但在它分叉之前),而不是在start. 这可能会导致作业无序运行。这不是我第一次希望 P::FM 有一个 pre-fork 回调。也许你可以向维护者要一个。

于 2012-07-16T16:00:15.047 回答