这种行为是意料之中的:当主线程退出时,所有其他线程也会退出。如果你不在乎,你可以$thread->detach
。否则,您必须手动操作$thread->join
它们,我们会这样做。
$thread->join
等待线程完成,并获取返回值(线程可以像子例程一样返回值,尽管上下文(list/void/scalar)必须在生成时固定)。
我们将detach
排队数据的线程:
threads->create(\&buildQueue)->detach for 1..5;
现在对于出列线程,我们将它们放入一个词法变量中(为什么要使用全局变量?),以便我们稍后可以将它们出列:
my @dequeue_threads = map threads->create(\&process_queue), 1 .. 3;
然后等待他们完成:
$_->join for @dequeue_threads;
我们知道分离的线程将在程序退出之前完成执行,因为出队线程退出的唯一方法是耗尽队列。
除了一个半的错误。你看,空队列和完成队列之间是有区别的。如果队列只是空的,则出队线程将阻塞,$queue->dequeue
直到它们得到一些输入。传统的解决方案dequeue
是定义他们获得的价值。undef
我们可以通过在队列中提供与从队列中读取的线程一样多的值来打破循环。更现代的版本Thread::Queue
有一个end
方法,可以dequeue
返回undef
所有后续调用。
问题是何时结束队列。在所有排队线程都退出之后,我们应该这样做。这意味着,我们应该手动等待它们。叹。
my @enqueueing = map threads->create(\&enqueue), 1..5;
my @dequeueing = map threads->create(\&dequeue), 1..3;
$_->join for @enqueueing;
$queue->enqueue(undef) for 1..3;
$_->join for @dequeueing;
并在sub dequeuing
: while(defined( my $item = $queue->dequeue )) { ... }
。
使用defined
测试修复了另一个错误:rand
可以返回零,尽管这不太可能并且会通过大多数测试。的约定rand
是它返回一个伪随机浮点数,介于包括零和排除某个上限之间:区间中的一个数字[0, x)
。界限默认为1
.
如果您不想手动加入排队线程,则可以使用信号量来表示完成。信号量是一种多线程原语,可以递增和递减,但不能低于零。如果减量操作会使下降计数低于零,则调用会阻塞,直到另一个线程提高计数。如果开始计数是1
,这可以用作阻塞资源的标志。
我们也可以从一个负值开始1 - $NUM_THREADS
,让每个线程递增该值,这样只有当所有线程都退出时,它才能再次递减。
use threads; # make a habit of importing `threads` as the first thing
use strict; use warnings;
use feature 'say';
use Thread::Queue;
use Thread::Semaphore;
use constant {
NUM_ENQUEUE_THREADS => 5, # it's good to fix the thread counts early
NUM_DEQUEUE_THREADS => 3,
};
sub enqueue {
my ($out_queue, $finished_semaphore) = @_;
my $tid = threads->tid;
# iterate over ranges instead of using the while($maxval --> 0) idiom
for (1 .. 1000) {
$out_queue->enqueue(my $val = rand 10_000);
say "Thread $tid enqueued $val";
}
$finished_semaphore->up;
# try a non-blocking decrement. Returns true only for the last thread exiting.
if ($finished_semaphore->down_nb) {
$out_queue->end; # for sufficiently modern versions of Thread::Queue
# $out_queue->enqueue(undef) for 1 .. NUM_DEQUEUE_THREADS;
}
}
sub dequeue {
my ($in_queue) = @_;
my $tid = threads->tid;
while(defined( my $item = $in_queue->dequeue )) {
say "thread $tid dequeued $item";
}
}
# create the queue and the semaphore
my $queue = Thread::Queue->new;
my $enqueuers_ended_semaphore = Thread::Semaphore->new(1 - NUM_ENQUEUE_THREADS);
# kick off the enqueueing threads -- they handle themself
threads->create(\&enqueue, $queue, $enqueuers_ended_semaphore)->detach for 1..NUM_ENQUEUE_THREADS;
# start and join the dequeuing threads
my @dequeuers = map threads->create(\&dequeue, $queue), 1 .. NUM_DEQUEUE_THREADS;
$_->join for @dequeuers;
如果线程似乎不是并行运行,而是顺序运行,请不要感到惊讶:此任务(将随机数排队)非常快,并且不适合多线程(排队比创建随机数更昂贵)。
这是一个示例运行,其中每个 enqueuer 只创建两个值:
Thread 1 enqueued 6.39390993005694
Thread 1 enqueued 0.337993319585337
Thread 2 enqueued 4.34504733960242
Thread 2 enqueued 2.89158054485114
Thread 3 enqueued 9.4947585773571
Thread 3 enqueued 3.17079715055542
Thread 4 enqueued 8.86408863197179
Thread 5 enqueued 5.13654995317669
Thread 5 enqueued 4.2210886147538
Thread 4 enqueued 6.94064174636395
thread 6 dequeued 6.39390993005694
thread 6 dequeued 0.337993319585337
thread 6 dequeued 4.34504733960242
thread 6 dequeued 2.89158054485114
thread 6 dequeued 9.4947585773571
thread 6 dequeued 3.17079715055542
thread 6 dequeued 8.86408863197179
thread 6 dequeued 5.13654995317669
thread 6 dequeued 4.2210886147538
thread 6 dequeued 6.94064174636395
您可以看到5
之前设法将一些东西排入队列4
。线程7
并8
没有让任何东西出队,6
太快了。此外,在产生出队之前,所有入队都已完成(对于如此少量的输入)。