1

我有一些代码,我希望输出为 1 和 6,但它却输出 1 ad infinitum。

use v5.10;
use Parallel::Prefork;
use List::MoreUtils qw( natatime );
use POSIX qw( ceil );

my $forks = 2;

my @numbers       = (1..10);
my $chunk_size    = ceil((scalar @numbers) / $forks);
my $game_iterator = natatime $chunk_size, @numbers;
my $fm            = Parallel::Prefork->new({ max_workers => $forks });

while ($fm->signal_received ne 'TERM') {
  while( my @numbers_chunk = $game_iterator->() ) { 
    $fm->start(sub {
        say $numbers_chunk[0];
    });
  }
}

$fm->wait_all_children;

# bash-4.2$ perl test.pl
# 1
# 1
# 1
# 1
# 1
# etc

上面的脚本将一个包含 10 个数字的数组拆分为 $fork 个数组 (2),并且应该将每个数组传递给它们自己的 fork 进行处理。

如果您替换$fm->start(sub {say $numbers_chunk[0];});为仅say $numbers_chunk[0];显示正确的结果。Parallel::ForkManager 还输出正确的结果(按照概要),所以我不知道我做错了什么,或者这是否是模块中的错误。

输出预期结果的 ForkManager 脚本:

use v5.10;
use Parallel::ForkManager;
use List::MoreUtils qw( natatime );
use POSIX qw( ceil );

my $forks = 2;

my @numbers       = (1..10);
my $chunk_size    = ceil((scalar @numbers) / $forks);
my $game_iterator = natatime $chunk_size, @numbers;
my $fm            = Parallel::ForkManager->new($forks );

while( my @numbers_chunk = $game_iterator->() ) { 
  $fm->start and next;
  say $numbers_chunk[0];
  $fm->finish;
}

$fm->wait_all_children;


# bash-4.2$ perl test.pl
# 1
# 6
4

2 回答 2

2

与文档相反,Parallel::Prefork 与 Parallel::ForkManager非常不同。它的设计目的是供网络服务器之类的东西使用,它会加载一次配置,然后生成相同的子节点,直到它被信号关闭。

因此,start根据需要不断创建子进程,并且在捕获到终止整个进程的信号之前不会返回。

也就是说,可以通过使用before_fork.

use strict;
use warnings;
use v5.10;

use List::MoreUtils   qw( natatime );
use Parallel::Prefork qw( );
use POSIX             qw( ceil );

my $forks = 2;

my @numbers       = (1..10);
my $chunk_size    = ceil(@numbers / $forks);
my $game_iterator = natatime($chunk_size, @numbers);

my @numbers_chunk;

my $fm = Parallel::Prefork->new({
   max_workers => $forks,
   trap_signals => { TERM => 'TERM' },
   before_fork => sub {
      @numbers_chunk = $game_iterator->()
         or kill(TERM => $$);
   },
});

$fm->start(sub {
   say $numbers_chunk[0];
});

$fm->wait_all_children();

但是为什么不直接使用 Parallel::ForkManager 而不是强制 Parallel::Prefork 来模拟它呢?

于 2013-07-23T14:05:22.693 回答
1

Parallel::Prefork专为独立的无状态、可重新启动的工作进程而设计,这些进程不需要来自父进程的数据。该模块不提供将数据线程化到回调的工具,这使得设置通信通道(例如传递数字块)变得很尴尬。

与下面直接调用的更简单的程序相比fork,该模块似乎没有给你买任何东西。

#! /usr/bin/env perl

use strict;
use warnings;

use v5.10;
use List::MoreUtils qw( natatime );
use POSIX qw( ceil WNOHANG );

my $forks = 2;

my @numbers       = (1 .. 10);
my $chunk_size    = ceil(scalar @numbers / $forks);
my $game_iterator = natatime $chunk_size, @numbers;

for (1 .. $forks) {
  if (my @numbers_chunk = $game_iterator->()) {
    unless (fork // die "$0: fork: $!") {
      say $numbers_chunk[0];
      exit 0;
    }
  }
}

# wait for all child processes
my $pid;
do { $pid = waitpid -1, WNOHANG } while $pid > 0;

您可以通过使用 System V IPC 来解决 Parallel::Prefork 的设计约束,例如,使用如下代码中的消息队列。

#! /usr/bin/env perl

use strict;
use warnings;

use Parallel::Prefork;
use List::MoreUtils qw( natatime );
use POSIX qw( ceil );
use IPC::SysV qw(IPC_NOWAIT IPC_PRIVATE S_IRUSR S_IWUSR);
use IPC::Msg;
use Errno qw( ENOMSG );

my $forks = 3;

my @numbers       = (1 .. 20);
my $chunk_size    = ceil((scalar @numbers) / $forks);
my $game_iterator = natatime $chunk_size, @numbers;
my $fm            = Parallel::Prefork->new({ max_workers => $forks });

my $maxsize = 0;
my $msg = new IPC::Msg(IPC_PRIVATE, S_IRUSR | S_IWUSR);
while (my @numbers_chunk = $game_iterator->()) {
  my $chunk = join " ", @numbers_chunk;
  $msg->snd(1, $chunk) or die "$0: msgsnd: $!";
  $maxsize = length $chunk if length $chunk > $maxsize;
}

my $ppid = $$;

while ($fm->signal_received ne 'TERM') {
  $fm->start(sub {
    my $ok = $msg->rcv(my $buf, $maxsize, 1, IPC_NOWAIT);
    if (!$ok) {
      if ($!{ENOMSG}) {
        sleep 1;  # XXX: poor man's synchronization
        kill TERM => $ppid or die "$0: kill: $!";
        return;
      }
      die "$0: msgrcv: $!";
    }
    print "[$$]: got '$buf'\n";
  });
}

$fm->wait_all_children;

这个实现是可以通过的,因为所有进程都使用相同的全局消息队列对象。

样本输出:

[31198]:得到'8 9 10 11 12 13 14'
[31197]:得到'1 2 3 4 5 6 7'
[31200]:得到'15 16 17 18 19 20'

正如上面的代码所示,您确实需要一个比 Parallel::Prefork 提供的更适合您的问题的抽象。

于 2013-07-23T01:45:10.993 回答