在这种情况下,实际上没有必要在args
参数中包含队列,无论您使用什么平台。原因是即使看起来你没有明确地将这两个JoinableQueue
实例传递给孩子,但实际上你是 - via self
。因为self
is明确地被传递给孩子,并且两个队列是 的一部分self
,所以它们最终被传递给孩子。
在 Linux 上,这是通过 发生的,这意味着内部用于进程间通信的对象os.fork()
使用的文件描述符由子进程继承(而不是复制)。其他部分become ,但没关系;旨在使需要复制的任何部分实际上都不需要在两个进程之间保持同步。事实上,许多内部属性在发生后会被重置:multiprocessing.connection.Connection
Queue
Queue
copy-on-write
multiprocessing.Queue
fork
def _after_fork(self):
debug('Queue._after_fork()')
self._notempty = threading.Condition(threading.Lock())
self._buffer = collections.deque()
self._thread = None
self._jointhread = None
self._joincancelled = False
self._closed = False
self._close = None
self._send = self._writer.send # _writer is a
self._recv = self._reader.recv
self._poll = self._reader.poll
这涵盖了Linux。窗户怎么样?Windows 没有fork
,因此需要腌制self
才能将其发送给孩子,这包括腌制我们的Queues
. 现在,通常如果您尝试腌制 a multiprocessing.Queue
,它会失败:
>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> import pickle
>>> pickle.dumps(q)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
Pickler(file, protocol).dump(obj)
File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/local/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/usr/local/lib/python2.7/copy_reg.py", line 84, in _reduce_ex
dict = getstate()
File "/usr/local/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__
assert_spawning(self)
File "/usr/local/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning
' through inheritance' % type(self).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance
但这实际上是人为的限制。在某些情况下可以腌制multiprocessing.Queue
对象- 它们如何被发送到 Windows 中的子进程?事实上,如果我们看一下实现,我们可以看到:
def __getstate__(self):
assert_spawning(self)
return (self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
def __setstate__(self, state):
(self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._after_fork()
__getstate__
,它在腌制一个实例时被调用,其中有一个assert_spawning
调用,它确保我们在尝试腌制时实际上是在生成一个进程*。__setstate__
,在 unpickling 时调用,负责调用_after_fork
.
那么Connection
当我们必须腌制时,队列使用的对象是如何维护的呢?事实证明,有一个multiprocessing
子模块可以做到这一点 - multiprocessing.reduction
。模块顶部的注释非常清楚地说明了这一点:
#
# Module to allow connection and socket objects to be transferred
# between processes
#
在 Windows 上,该模块最终使用 Windows 提供的DuplicateHandle API 来创建子进程Connection
对象可以使用的重复句柄。因此,虽然每个进程都有自己的句柄,但它们是完全相同的——对一个进程所做的任何操作都会反映在另一个进程上:
复制句柄指的是与原始句柄相同的对象。因此,对对象的任何更改都会通过两个句柄反映出来。例如,如果您复制一个文件句柄,则两个句柄的当前文件位置始终相同。
*有关更多信息,请参阅此答案assert_spawning