1

使用以下代码,似乎传递给工作人员的队列实例未初始化:

from multiprocessing import Process
from multiprocessing.queues import Queue

class MyQueue(Queue):

    def __init__(self, name):
        Queue.__init__(self)
        self.name = name

def worker(queue):
    print queue.name

if __name__ == "__main__":
    queue = MyQueue("My Queue")
    p = Process(target=worker, args=(queue,))
    p.start()
    p.join()

这抛出:

... line 14, in worker
    print queue.name
AttributeError: 'MyQueue' object has no attribute 'name'

我无法重新初始化队列,因为我会丢失 queue.name 的原始值,甚至将队列的名称作为参数传递给工作人员(这应该可以,但它不是一个干净的解决方案)。

那么,如何从 multiprocessing.queues.Queue 继承而不会出现此错误?

4

1 回答 1

4

在 POSIX 上,Queue对象通过简单的继承共享给子进程。 *

在 Windows 上,这是不可能的,所以它必须腌制Queue,通过管道将其发送给孩子,然后取消腌制。

(这可能并不明显,因为如果你真的尝试腌制 a Queue,你会得到一个异常,RuntimeError: MyQueue objects should only be shared between processes through inheritance。如果你查看源代码,你会发现这真的是一个谎言 - 如果你尝试腌制它只会引发这个异常a Queuewhenmultiprocess不在生成子进程的中间。)

当然,一般的酸洗和解酸不会有任何好处,因为你最终会得到两个相同的队列,而不是两个进程中的同一个队列。因此,multiprocessing通过添加对象在 unpickling 时使用的机制来扩展一些东西register_after_fork。** 如果您查看的源代码Queue,您可以看到它是如何工作的。

但是你真的不需要知道它是如何挂钩的。你可以像任何其他类的酸洗一样钩住它。例如,这应该有效:***

def __getstate__(self):
    return self.name, super(MyQueue, self).__getstate__()

def __setstate__(self, state):
    self.name, state = state
    super(MyQueue, self).__setstate__(state)

有关更多详细信息,pickle文档解释了您可以影响类腌制方式的不同方式。

(如果它不起作用,而且我没有犯愚蠢的错误......那么你必须至少知道一点关于它是如何工作的......但很可能只是想弄清楚之前是否要做额外的工作或在 之后_after_fork(),只需要交换最后两行......)


* 我不确定是否真的保证在 POSIX 平台上使用简单的分叉继承。这恰好在 2.7 和 3.3 上是正确的。但是有一个分支multiprocessing在所有平台上都使用 Windows 风格的 pickle-everything 以保持一致性,另一个分支在 OS X 上使用混合以允许CoreFoundation在单线程模式下使用,或者类似的方式,显然这种方式是可行的.

** 实际上,我认为 Queue只是register_after_fork为了方便而使用,并且可以在没有它的情况下重写......但这取决于Pipe_after_fork在 Windows 或LockPOSIXBoundedSemaphore上的魔力。

*** 这只是正确的,因为我碰巧知道,通过阅读源代码,这Queue是一个新样式的类,不会覆盖__reduce__or __reduce_ex,并且永远不会从__getstate__. 如果您不知道这一点,则必须编写更多代码。

于 2013-09-19T23:52:57.213 回答