3

所以,基本上我有一个非常大的数组,我需要从中读取数据。我希望能够并行执行此操作;然而,当我尝试时,我惨遭失败。为了简单起见,假设我有一个包含 100 个元素的数组。我的想法是将数组分成 10 个相等的部分并尝试并行读取它们(10 个是任意的,但我不知道我可以一次运行多少个进程,而 10 个似乎足够低)。我需要根据每个分区的读数返回一个计算(新数据结构),但我没有修改原始数组中的任何内容。

我没有完全尝试上述方法,而是尝试了一些更简单的方法,但我做错了,因为它在任何容量下都不起作用。所以,然后我尝试简单地使用子进程来推送到一个数组。下面的代码Time::HiRes用于查看使用分叉而不是不使用分叉来运行它的速度有多快,但我还没有到那个时候(当我有接近几百万个条目时,我将对其进行测试在我的数组中):

use strict;
use warnings;
use Time::HiRes;

print "Starting main program\n";

my %child;
my @array=();
my $counter=0;
my $start = Time::HiRes::time();

for (my $count = 1; $count <= 10; $count++) 
{
        my $pid = fork();

        if ($pid) 
        {
                $child{$pid}++;
        } 
        elsif ($pid == 0) 
        {
                addToArray(\$counter,\@array); 
                exit 0;
        } 
        else 
        {
                die "couldnt fork: $!\n";
        }
}

while (keys %child)
{
    my $pid = waitpid(-1,0);
    delete $child{$pid};
}

my $stop = Time::HiRes::time();
my $duration = $stop-$start;

print "Time spent: $duration\n";
print "Size of array: ".scalar(@array)."\n";
print "End of main program\n";

sub addToArray 
{
my $start=shift;
my $count=${$start};
${$start}+=10;
my $array=shift;

  for (my $i=$count; $i<$count +10; $i++)
    {
     push @{$array}, $i;
    }

  print scalar(@{$array})."\n";
}

注意:我用 push 代替了${$array}[$i]=$i,因为我意识到我$counter实际上并没有更新,所以这永远不会与这段代码一起工作。

我认为这不起作用,因为孩子都是原始程序的副本,而我实际上从未在“原始程序”中的数组中添加任何内容。在那张纸条上,我很困惑。同样,我实际上试图解决的实际问题是如何分区我的数组(其中包含数据)并尝试并行读取它们并根据我的读数返回计算(注意:我不会修改原始数组),但如果我不知道如何真正让我$counter更新,我将永远无法做到这一点。我也想知道如何让上面的代码做我想做的事情,但这是次要目标。

一旦我可以让我的计数器正确更新,是否有可能在更新之前启动另一个进程并且我实际上不会读取整个数组?如果是这样,我该如何解释?

请,任何帮助将不胜感激。我很沮丧/卡住了。我希望有一个简单的解决方法。提前致谢。

编辑:我尝试使用 Parallel::ForkManager,但无济于事:

#!/usr/local/roadm/bin/perl
use strict;
use warnings;
use Time::HiRes;
use Parallel::ForkManager;

my $pm = Parallel::ForkManager->new(10);

for (my $count = 1; $count <= 10; $count++) 
{
    my $pid = $pm->start and next;   
    sub1(\$counter,\@array); 
    $pm->finish; # Terminates the child process       
}
  $pm->wait_all_children;  

我没有包括其他无关的东西,请参阅上面缺少的代码/子...再次,我们将不胜感激。我对此很陌生,需要有人握住我的手。我也尝试用run_on_startand做一些事情run_on_finish,但他们也没有工作。

4

4 回答 4

5

您的代码有两个问题:您的子进程不共享数据,如果分叉的进程共享数据,您将面临竞争条件。解决办法是use threads。通过在父线程中对数据进行分区,当然也可以通过不使用共享数据来消除竞争条件的任何可能性。

线程

Perl 中的线程行为类似于forking:默认情况下,没有共享内存。这使得使用线程非常容易。然而,每个线程都运行它自己的 perl 解释器,这使得线程非常昂贵。谨慎使用。

首先,我们必须通过 激活线程支持use threads。要启动一个线程,我们这样做threads->create(\&code, @args),它返回一个线程对象。然后代码将在单独的线程中运行,并使用给定的参数调用。线程执行完毕后,我们可以通过调用来收集返回值$thread->join。注意:线程代码的上下文由create方法确定,而不是由join.

:shared我们可以用属性标记变量。你$counter@array将是这方面的例子,但通常最好传递数据的显式副本而不是使用共享状态(免责声明:从理论的角度来看)。为了避免共享数据的竞争条件,您实际上必须$counter使用信号量来保护您的数据,但同样,不需要共享状态。

这是一个玩具程序,展示了如何使用线程来并行化计算:

use strict;
use warnings;
use threads;
use 5.010; # for `say`, and sane threads
use Test::More;

# This program calculates differences between elements of an array

my @threads;
my @array = (1, 4, 3, 5, 5, 10, 7, 8);
my @delta = ( 3, -1, 2, 0, 5, -3, 1 );

my $number_of_threads = 3;
my @partitions = partition( $#array, $number_of_threads );
say "partitions: @partitions";

for (my $lower_bound = 0; @partitions; $lower_bound += shift @partitions) {
  my $upper_bound = $lower_bound + $partitions[0];
  say "spawning thread with [@array[$lower_bound .. $upper_bound]]";
  # pass copies of the values in the array slice to new thread:
  push @threads, threads->create(\&differences, @array[$lower_bound .. $upper_bound]);
  # note that threads->create was called in list context
}

my @received;
push @received, $_->join for @threads; # will block until all are finished

is_deeply \@received, \@delta;
done_testing;

# calculates the differences. This doesn't need shared memory.
# note that @array could have been safely accessed, as it is never written to
# If I had written to a (unshared) variable, these changes would have been thread-local
sub differences {
  say "Hi from a worker thread, I have ", 0+@_, " elements to work on";
  return map $_[$_] - $_[$_-1], 1 .. $#_;
  # or more readable:
  # my @d;
  # for my $i (1 .. $#_) {
  #   push @d, $_[$i] - $_[$i-1];
  # }
  # return @d;
}

# divide workload into somewhat fair parts, giving earlier threads more work
sub partition {
  my ($total, $parts) = @_;
  my $base_size = int($total / $parts);
  my @partitions = ($base_size) x $parts;
  $partitions[$_-1]++ for 1 .. $total - $base_size*$parts;
  return @partitions;
}

关于线程数的说明:这应该取决于系统的处理器数量。如果您有四个内核,那么超过四个线程没有多大意义。

于 2013-04-28T21:32:53.877 回答
3

如果要在 fork 后使用子进程,则每个子进程都是自治的,并且在从主程序 fork 时在程序中拥有自己的数据副本。孩子对自己的记忆所做的改变对父母的记忆没有影响。如果需要,要么需要线程化 Perl 并使用线程,要么需要重新考虑——也许使用共享内存,但将 Perl 数据定位到共享内存中可能会很棘手。

因此,一种选择是在分叉之前将所有数据读入内存并让孩子们处理他们自己的数据副本。

根据问题的结构,另一种可能性可能是让每个孩子阅读并处理部分数据。如果每个孩子都必须有权访问所有数据,这将不起作用。

如果线程或进程都被捆绑在读取同一个文件中,那么您将通过线程或分叉获得多少速度尚不清楚。将数据放入内存可能最好被视为单线程(单任务)操作;一旦数据在内存中,并行性就会立即生效并产生好处。

于 2013-04-28T20:42:11.650 回答
0

有一些 CPAN 模块可以让您的生活更轻松。其中之一是Parallel::ForkManager,它是一个简单的并行处理分支管理器

于 2013-04-28T20:00:26.960 回答
0

所以,经过我的努力,这里是修复:

编辑:这并没有完成我想做的事情

#!/usr/local/roadm/bin/perl
use strict;
use warnings;
use Time::HiRes;
use Parallel::ForkManager;

print "Starting main program\n";
my @array=();
my $counter=0;
my $start = Time::HiRes::time();
my $max_processes=20;
my $partition=10;
my $max_elements=100;

my $pm = Parallel::ForkManager->new($max_processes);

$pm->run_on_start( sub {
          my ($pid, $exit_code, $ident) = @_;
          sub1(\$counter,\@array);  
      });    

while ($counter < $max_elements)
{
    my $pid = $pm->start and next;   

    $pm->finish; # Terminates the child process     
}
  $pm->wait_all_children;

my $stop = Time::HiRes::time();
my $duration = $stop-$start;

print "Time spent: $duration\n";
print "Size of array: ".scalar(@array)."\n";
print "\nEnd of main program\n";

sub sub1 {
my $start=shift;
my $count=${$start};
${$start}+=$partition;
my $array=shift;

  for (my $i=$count; $i<$count + $partition; $i++)
{
     push @{$array}, $i;
}
     return @{$array};
}
于 2013-04-28T20:44:54.777 回答