0

我正在尝试在我的程序中创建子程序的线程(称为重组)。我使用下面的代码来创建线程,我改编自http://chicken.genouest.org/perl/multi-threading-with-perl/我使用了这段代码,因为这些线程是在循环中创建的,使用取决于变量 $ParentTally 的线程数,每个循环都不同($ParentTally 最多可以有 1000 个,我不想一次运行 1000 个线程)

my $nb_process = 20;
my $nb_compute = $ParentTally;
my $i=0;
my @running = ();
my @Threads;
my %NewPopulation;

while (scalar @Threads < $nb_compute) {
    @running = threads->list(threads::running);
    if (scalar @running < $nb_process) {
        my $Offspring= threads->new (sub {Recombination(\%Parent1Chromosome, \%Parent2Chromosome)});
        push (@Threads, $Offspring);
        my $tid = $Offspring->tid;
    }

    @running = threads->list(threads::running);
    foreach my $thr (@Threads) {
        if ($thr->is_running()) {
           my $tid = $thr->tid;
        }
       elsif ($thr->is_joinable()) {
          my $tid = $thr->tid;
          my $Offspring1=$thr->join();
          $NewPopulation{$Offspring1}{'Tally'}+=1;
       }
    }
    @running = threads->list(threads::running);
    $i++;
}

while (scalar @running != 0) {
     foreach my $thr (@Threads) {
      if ($thr->is_joinable()){
         my $Offspring1=$thr->join(); 
         $NewPopulation{$Offspring1}{'Tally'}+=1;
      }
    }
    @running = threads->list(threads::running);
}

(注意:$ParentTally 取自代码前面的另一个哈希值,我的 $ParentTally=$hashref->{'Tally'}; 所以这部分程序每次循环使用不同的 $ParentTally 值。%Parent1Chromosome & %Parent2Chromosome 是在程序前面创建的。子程序“重组”很长,所以我没有发布它,但它返回一个整数。)

通常在运行程序时(尽管并非总是如此,许多早期的代码都依赖于随机变量,因此程序永远不会运行相同的)一旦它完成我得到'Perl exited with active threads:'number'finished and unjoined'( “数字”因运行而异)。我以为:

 while (scalar @running != 0) {
     foreach my $thr (@Threads) {
      if ($thr->is_joinable()){
         my $Offspring1=$thr->join(); 
         $NewPopulation{$Offspring1}{'Tally'}+=1;
      }
    }

是否意味着所有线程都会在进入下一段代码之前完成?我究竟做错了什么?(我以前从未使用过线程)。我曾研究过使用http://www.perlmonks.org/?node_id=735931但我并不真正了解如何使用 Thread::Queue 并且无法找到教程(并且不了解http ://perldoc.perl.org/Thread/Queue.html)。谢谢

4

2 回答 2

4

不是对您的代码的修复,但这里是我如何使用队列的大纲(显然需要一些填充以适应您的目的)。如果内存使用存在问题,有很多方法可以改进这一点 - 每个生成的线程都获取所有范围内变量的完整副本;使用线程时很容易遇到内存问题

#!/usr/bin/perl

use strict ;
use threads ;
use Thread::Queue ;

my $threadCount = 2 ;

my $DataQueue = Thread::Queue->new() ;
my $ReportQueue = Thread::Queue->new() ;

my $threads = [] ;

# create pool of worker threads
for ( my $i = 0 ; $i<$threadCount ; $i ++ ){
    push( @$threads, threads->create( \&doStuff, $DataQueue, $ReportQueue ) ) ;
}

# array of data on which the threads have to work
my @array ;
# put work onto queue for threads to process
foreach my $workItem ( @array ){
   $DataQueue->enqueue( $workItem );
}

# enqueue undef for each worker to tell it no more work
# then wait for them all to join
$DataQueue->enqueue( (undef) x $threadCount ) ;
$_->join for @$threads ;

my %NewPopulation ;
# read the output of the threads from ReportQueue
while ( my $reportItem = $ReportQueue->dequeue() ){
    $NewPopulation{$reportItem}{'Tally'}++ ;
}

# display tallys
for my $offspring ( keys %NewPopulation ){
    print "Offspring $offspring Tally => " . $NewPopulation{$offspring}{'Tally'} . "\n" ;
}

sub doStuff{
    my ( $DataQueue, $ReportQueue ) = @_ ;

    while ( my $inputHash = $DataQueue->dequeue() ){
        my $result ;
        # do things here - the logic in your Recombination sub

        # return result to report queue
        $ReportQueue->enqueue($result) ;
    }

   # Enqueue undef to report queue so report thread knows we're done
   $ReportQueue->enqueue( undef ) ;

}
于 2012-06-20T15:33:15.240 回答
2

我相信错误在最后一个while循环中:

while (scalar @running != 0) {
     foreach my $thr (@Threads) {
      if ($thr->is_joinable()){
         my $Offspring1=$thr->join(); 
         $NewPopulation{$Offspring1}{'Tally'}+=1;
      }
    }
    @running = threads->list(threads::running);
}

根据线程文档is_joinable,仅当线程已完成运行、尚未分离且尚未加入时,调用才会返回 true。我的猜测是,当您进入本节时,您仍然有正在运行的线程,因此您跳过它们。您可能会像在上一个while循环中所做的那样进行另一个调用,is_running以查看您是否仍有线程在运行并以某种方式处理线程。

于 2012-06-20T15:10:10.200 回答