1

我正在尝试基于稍微改变的老板/工人模型来实现多线程应用程序。基本上,主线程创建了几个老板线程,每个老板线程又产生两个工作线程(可能更多)。这是因为老板线程每个处理一个主机或网络设备,工作线程可能需要一段时间才能完成工作。

Thread::Pool用来实现这个概念,到目前为止效果很好;我也不认为我的问题与Thread::Pool(见下文)有关。前面非常简化的伪代码:

use strict;
use warnings;

my $bosspool = create_bosspool();   # spawns all boss threads
my $taskpool = undef;               # created in each boss thread at
                                    # creation of each boss thread 

# give device jobs to boss threads
while (1) {
  foreach my $device ( @devices ) {
    $bosspool->job($device);
  }

  sleep(1);
}

# This sub is called for jobs passed to the $bosspool
sub process_boss
{
  my $device = shift;

  foreach my $task ( $device->{tasks} ) {
    # process results as they become available
    process_result() while ( $taskpool->results );
    # give task jobs to task threads
    scalar $taskpool->job($device, $task);
    sleep(1); ### HACK ###
  }

  # process remaining results / wait for all tasks to finish
  process_result() while ( $taskpool->results || $taskpool->todo );

  # happy result processing
}

sub process_result
{
  my $result = $taskpool->result_any();

  # mangle $result
}

# This sub is called for jobs passed to the $taskpool of each boss thread
sub process_task
{
  # not so important stuff

  return $result;
}

顺便说一句,我不使用monitor()-routine 的原因是因为我必须等待所有工作$taskpool完成。现在,除非您删除该### HACK ###行,否则此代码的效果非常好。不睡觉,$taskpool->todo()如果您添加它们或接收它们的结果太“快”,将无法提供正确数量的仍然打开的工作。就像,您总共添加了 4 个作业,但$taskpool->todo()之后只会返回 2 个(没有待处理的结果)。这会导致各种有趣的效果。

好的,Thread::Pool->todo()废话,让我们尝试一种解决方法:

sub process_boss
{
  my $device = shift;

  my $todo = 0;

  foreach my $task ( $device->{tasks} ) {
    # process results as they become available
    while ( $taskpool->results ) {
      process_result();
      $todo--;
    }
    # give task jobs to task threads
    scalar $taskpool->job($device, $task);
    $todo++;
  }

  # process remaining results / wait for all tasks to finish
  while ( $todo ) {
    process_result();
    sleep(1); ### HACK ###
    $todo--;
  }
}

只要我保持### HACK ###线路,这也可以正常工作。如果没有这一行,此代码将重现 的问题Thread::Pool->todo(),因为$todo它不仅会减少 1,还会减少 2 甚至更多。

我只用一个老板线程测试了这段代码,所以基本上没有涉及多线程(当涉及到这个子例程时)。$bosspool$taskpool尤其$todo是没有:shared,没有副作用,对吧?这个子程序中发生了什么,它只由一个老板线程执行,没有共享变量、信号量等?

4

1 回答 1

0

我建议实现“工作者”线程模型的最佳方法是使用Thread::Queue. 做这样的事情的问题是确定队列何时完成,或者项目是否已出列并等待处理。

您可以使用Thread::Queuewhile 循环从队列和队列中获取元素,end以便 while 循环返回 undef 并且线程退出。

因此,您并不总是需要多个“老板”线程,您可以使用多种不同风格的worker输入队列。我会质疑为什么在那种情况下你需要一个“老板”线程模型。似乎没有必要。

参考: Perl daemonize with child daemons

#!/usr/bin/perl

use strict;
use warnings;
use threads;
use Thread::Queue;

my $nthreads = 4;

my @targets = qw ( device1 device2 device3 device4 );

my $task_one_q = Thread::Queue->new();
my $task_two_q = Thread::Queue->new();

my $results_q = Thread::Queue->new();

sub task_one_worker {
    while ( my $item = task_one_q->dequeue ) {

        #do something with $item

        $results_q->enqueue("$item task_one complete");
    }
}

sub task_two_worker {
    while ( my $item = task_two_q->dequeue ) {

        #do something with $item

        $results_q->enqueue("$item task_two complete");
    }
}

#start threads;

for ( 1 .. $nthreads ) {
    threads->create( \&task_one_worker );
    threads->create( \&task_two_worker );
}

foreach my $target (@targets) {
    $task_one_q->enqueue($target);
    $task_two_q->enqueue($target);
}

$task_one_q->end;
$task_two_q->end;

#Wait for threads to exit.

foreach my $thr ( threads->list() ) {
    threads->join();
}

$results_q->end();

while ( my $item = $results_q->dequeue() ) {
    print $item, "\n";
}

如果您愿意,您可以对线程执行类似的操作boss- 您可以创建一个队列boss并通过引用工作人员将其传递。我不确定这是否有必要。

于 2015-02-10T22:09:55.193 回答