12

直接来自 Python文档

类 multiprocessing.Queue([maxsize])

...

qsize() 返回队列的大致大小。由于多线程/多处理语义,这个数字是不可靠的。

empty() 如果队列为空,则返回 True,否则返回 False。由于多线程/多处理语义,这是不可靠的。

而且我凭经验发现这对于 是完全正确的Queue,尤其是对于empty()

在我的代码中,我有一堆进程(每个进程都是同一个主进程的子进程),每个进程的run方法中都有以下内容:

while self.active:
    if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0):
        try:
            self.exclusive_queue.put(self.general_queue.get(timeout=self.queue_timeout))
        except Queue.Empty as empty_queue:
            continue
    else:
        task = self.exclusive_queue.get()
        self.compute(task)

基本上,该进程等待general_queue工作,但首先检查它的exclusive_queue. 主进程可以将任务放入进程的通用队列或独占队列中。现在,在 中if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0),我首先使用了 a self.exclusive_queue.empty(),这导致了非常奇怪的行为(qsize()30+ 和empty() = True)。

所以我要去的地方是 - 因为multiprocessing.queues.SimpleQueue在文档中是这样写的:

empty() 如果队列为空,则返回 True,否则返回 False。

根本没有提到可靠性。SimpleQueue.empty() 可靠吗?

其次是multiprocessing.JoinableQueue可靠还是比机制“更”Queue可靠task_done()

这种方法可以被认为是正确的,还是带有回调的方法(通过孩子之间的共享管道端点)更合适?

4

1 回答 1

7

不是一个直接的答案,但我已经开始越来越依赖于使用保护条件迭代输入队列。多处理模块的文档中有一个示例:

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

因此,当您对队列的输入完成时,您只需将与启动进程put一样多的STOP字符串或您选择的任何守卫添加到队列中。

于 2013-02-18T14:22:31.550 回答