7

我一直在编写一个“检查器”系统,它对各种服务、系统、数据库、文件等执行各种“检查”。“检查”本质上是通用的,可以是任何东西。所有检查都以通用格式报告,无论它们是通过还是失败。

它以模块化的 OO 方式编写,因此开发人员可以简单地遵循框架并独立地编写检查。每个对象都包含一个共享的报告对象,在他们运行检查后,他们只需 $self->{'reporting'}->report(params)。定义了参数,并假定开发人员进行了适当的报告。报告对象然后索引这些报告。我的主加载程序脚本具有如下条目:

my $reportingObject = new Checks::Reporting(params);
my @checks;

push @checks, new Checks::Check_One($reportingObject, params));
push @checks, new Checks::Check_One($reportingObject, params));
.
.
push @checks, new Checks::Check_N($reportingObject, params));

完成检查并完成报告后,我一直在做:

foreach my $check (@checks) {
    $check->run_stuff();
}

$reportingObject->finalize_report();

现在,由于这些检查是完全独立的(不用担心报告对象),它们可以并行运行。作为我所做的改进:

my @threads;
foreach my $check (@checks) {
    push @threads, async { $check->run_stuff(); }
}

foreach my $thread (@threads) {
    $thread->join;
}

#All threads are complete, and thus all checks are done
$reportingObject->finalize_report();

正如我之前所说,开发人员将彼此独立地编写检查。有些检查很简单,有些则不然。简单检查中可能没有异步代码,但其他检查可能需要在内部异步运行,例如

sub do_check {
   my @threads;
   my @list = @{$self->{'list'}};

   foreach my $item (@list) {
      push @threads, async { 
                   #do_work_on_$item
                   #return 1 or 0 for success or fail
               };
      foreach my $thread (@threads) {
          my $res =  $thread->join;
          if($res == 1) {
              $self->{'reporting'}->report(params_here);
          }
      }
   }
}

正如你所看到的,线程模型允许我用非常模糊的术语来做事。每个“检查”无论是什么,都在自己的线程中独立运行。如果一个开发人员有异步的事情要做,不管是什么,他只是在自己的线程中独立地做。我想要一个类似的模型。

不幸的是,线程很慢而且效率低下。所有异步库都有特定的观察者,例如 IO 等。我不想要任何特定的东西。我想要一个基于事件的模型,它允许我简单地启动异步任务,无论它们是什么,并在它们全部完成时简单地通知,这样我就可以继续前进。

希望这可以解释它,您可以为我指明正确的方向。

4

1 回答 1

6

这似乎很适合老板-工人模型:

  • 在程序开始时产生一些工人。确保他们都可以访问队列。

  • 将尽可能多的检查排入队列。工作人员将检查出列,执行它们,并将结果排入输出队列。

  • 你的主线程查看来自输出线程的结果,然后做它想做的任何事情。

  • 加入一个END街区的工人

您可能想看看Thread::Queue::Any是否有机会将 coderefs 放入队列中。

这是一个完全可运行的示例:

use strict; use feature 'say';
use threads; use threads::shared; use Thread::Queue::Any;
use constant NUM_THREADS => 5;
local $Storable::Deparse = 1; local $Storable::Eval = 1;  # needed to serialize code

my $check_q  = Thread::Queue::Any->new;
my $result_q = Thread::Queue::Any->new;

# start the workers
{
  my $running :shared = NUM_THREADS;
  my @threads  = map threads->new(\&worker, $check_q, $result_q, \$running), 1..NUM_THREADS;

  END { $_->join for @threads }
}

# enqueue the checks
$check_q->enqueue($_) for sub {1}, sub{2}, sub{"hi"}, sub{ die };
$check_q->enqueue(undef) for 1..NUM_THREADS; # end the queue

while(defined( my $result = $result_q->dequeue )) {
  report($$result);
}

sub report {
  say shift // "FAILED";
}

sub worker {
  my ($in, $out, $running_ref) = @_;
  while (defined( my $check = $in->dequeue )) {
    my $result = eval { $check->() };
    $out->enqueue(\$result);
  }

  # last thread closes the door
  lock $$running_ref;
  --$$running_ref || $out->enqueue(undef);
}

这打印

1
2
hi
FAILED

以稍微随机的顺序。

于 2013-06-26T16:31:09.117 回答