我有 N 个 fork()ed 进程,每个进程都有 M 个线程(但当然,还有其他方法吗?)。线程应该等待作业被提交到某个队列中,并最终在同一个队列中发布一些结果。所有线程具有相同的优先级并尊重相同的数据流(deque() 一个任务, enque() 它的结果是一个新任务列表)。我应该如何进行队列实现?
我尝试使用IPC:: Shareable作为@queue快速重写Thread::Queue,其中enqueue()/dequeue()使用两级(线程/进程)锁,但结果是错误的(真的不能:share IPC::Shareable的绑定数组)。我想我不想让每个线程都携带自己的IPC::Shareable实例(不用想太多)?
我是否应该通过IPC::Msg冻结/解冻任务并使用信号量实现锁和 CV,保持每个线程IPC::Msg实例?在线程中直接实例化 IPC 原语听起来很奇怪(因此我尝试在线程级别:share IPC::Shareables)。
你们中的一些僧侣是否同时做到了正确和听起来?一个平面实现,没有中间件队列管理器等?你能分享一个想法,也许是一个模块?
谢谢!
PS 我宁愿不选择像 mpi、rabbitmq 等繁重的现有实现。
#!/usr/bin/perl
use strict;
use warnings;
use Carp qw/croak/;
use threads;
use threads::shared;
use insert::your::queue::impl::here;
que_make_init();
my $numthreads = 4;
my $numprocs = 4;
sub runthreads
{
foreach (1..$numthreads) {
@retcodes = map { $_->join } threads->create( sub {
my $q = que_get_process_level_shared_instance();
while ( (my $task = @q->get())) {
@q->put(process($task));
}
});
}
#do_something(@retcodes) ...
exit(0);
}
my %proctable = map {
croak unless defined (my $pid = fork());
runthreads if $pid == 0;
$pid => -1;
} (0..$numprocs-1);
#waitpid(), etc
__END__