1

我有 N 个 fork()ed 进程,每个进程都有 M 个线程(但当然,还有其他方法吗?)。线程应该等待作业被提交到某个队列中,并最终在同一个队列中发布一些结果。所有线程具有相同的优先级并尊重相同的数据流(deque() 一个任务, enque() 它的结果是一个新任务列表)。我应该如何进行队列实现?

我尝试使用IPC:: Shareable作为@queue快速重写Thread::Queue,其中enqueue()/dequeue()使用两级(线程/进程)锁,但结果是错误的(真的不能:share IPC::Shareable的绑定数组)。我想我不想让每个线程都携带自己的IPC::Shareable实例(不用想太多)?

我是否应该通过IPC::Msg冻结/解冻任务并使用信号量实现锁和 CV,保持每个线程IPC::Msg实例?在线程中直接实例化 IPC 原语听起来很奇怪(因此我尝试在线程级别:share IPC::Shareables)。

你们中的一些僧侣是否同时做到了正确和听起来?一个平面实现,没有中间件队列管理器等?你能分享一个想法,也许是一个模块?

谢谢!

PS 我宁愿不选择像 mpi、rabbitmq 等繁重的现有实现。

#!/usr/bin/perl
use strict;
use warnings;
use Carp qw/croak/;
use threads;
use threads::shared;

use insert::your::queue::impl::here;     

que_make_init();

my $numthreads = 4;
my $numprocs = 4;

sub runthreads 
{
    foreach (1..$numthreads) {
    @retcodes = map { $_->join } threads->create( sub {
        my $q = que_get_process_level_shared_instance();
        while ( (my $task = @q->get())) {
            @q->put(process($task));
        }
    });
    }
    #do_something(@retcodes) ...
    exit(0);

}

my %proctable = map {
        croak unless defined (my $pid = fork());
        runthreads if $pid == 0;
        $pid => -1;
    } (0..$numprocs-1);

#waitpid(), etc
__END__
4

1 回答 1

0

您可以使用 Cache::FastMmap。在每个线程/进程中使用相同的 share_file ,你就很好了。

窗体 POD:

“通过 mmap 文件实现的共享内存缓存。它的核心是用 C 语言编写的以提高性能。它使用 fcntl 锁定来确保多个进程可以同时安全地访问缓存。”

my $Cache = Cache::FastMmap->new(raw_values => 1, share_file => "/tmp/shared_file.shm");
于 2012-10-31T09:03:55.510 回答