12

I have few basic questions when it comes to using Python's multiprocessing module :

class Someparallelworkerclass(object) :

    def __init__(self):
       self.num_workers = 4
       self.work_queue = multiprocessing.JoinableQueue()
       self.result_queue = multiprocessing.JoinableQueue()

    def someparallellazymethod(self):
       p = multiprocessing.Process(target=self.worktobedone).start()

    def worktobedone(self):
      # get data from work_queue
      # put back result in result queue

Is it necessary to pass work_queue and result_queue as args to Process? Does the answer depends on the OS? The more fundamental question is: does the child process get a copied (COW) address space from the parent process, and hence knows the definition of the class/class method? If yes, how does it know that the queues are to be shared for IPC, and that it shouldn't make duplicates of the work_queue and result_queue in the child process? I tried searching this online but most of the documentation I found was vague, and didn't go into enough details as what exactly is happening underneath.

4

3 回答 3

9

在这种情况下,实际上没有必要在args参数中包含队列,无论您使用什么平台。原因是即使看起来你没有明确地将这两个JoinableQueue实例传递给孩子,但实际上你是 - via self。因为self is明确地被传递给孩子,并且两个队列是 的一部分self,所以它们最终被传递给孩子。

在 Linux 上,这是通过 发生的,这意味着内部用于进程间通信的对象os.fork()使用的文件描述符由子进程继承(而不是复制)。其他部分become ,但没​​关系;旨在使需要复制的任何部分实际上都不需要在两个进程之间保持同步。事实上,许多内部属性在发生后会被重置:multiprocessing.connection.ConnectionQueueQueuecopy-on-writemultiprocessing.Queuefork

def _after_fork(self):
    debug('Queue._after_fork()')
    self._notempty = threading.Condition(threading.Lock())
    self._buffer = collections.deque()
    self._thread = None
    self._jointhread = None
    self._joincancelled = False
    self._closed = False
    self._close = None
    self._send = self._writer.send  # _writer is a 
    self._recv = self._reader.recv
    self._poll = self._reader.poll

这涵盖了Linux。窗户怎么样?Windows 没有fork,因此需要腌制self才能将其发送给孩子,这包括腌制我们的Queues. 现在,通常如果您尝试腌制 a multiprocessing.Queue,它会失败:

>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> import pickle
>>> pickle.dumps(q)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/local/lib/python2.7/copy_reg.py", line 84, in _reduce_ex
    dict = getstate()
  File "/usr/local/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__
    assert_spawning(self)
  File "/usr/local/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning
    ' through inheritance' % type(self).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

但这实际上是人为的限制。在某些情况下可以腌制multiprocessing.Queue对象- 它们如何被发送到 Windows 中的子进程?事实上,如果我们看一下实现,我们可以看到:

def __getstate__(self):
    assert_spawning(self)
    return (self._maxsize, self._reader, self._writer,
            self._rlock, self._wlock, self._sem, self._opid)

def __setstate__(self, state):
    (self._maxsize, self._reader, self._writer,
     self._rlock, self._wlock, self._sem, self._opid) = state
    self._after_fork()

__getstate__,它在腌制一个实例时被调用,其中有一个assert_spawning调用,它确保我们在尝试腌制时实际上是在生成一个进程*。__setstate__,在 unpickling 时调用,负责调用_after_fork.

那么Connection当我们必须腌制时,队列使用的对象是如何维护的呢?事实证明,有一个multiprocessing子模块可以做到这一点 - multiprocessing.reduction。模块顶部的注释非常清楚地说明了这一点:

#
# Module to allow connection and socket objects to be transferred
# between processes
#

在 Windows 上,该模块最终使用 Windows 提供的DuplicateHandle API 来创建子进程Connection对象可以使用的重复句柄。因此,虽然每个进程都有自己的句柄,但它们是完全相同的——对一个进程所做的任何操作都会反映在另一个进程上:

复制句柄指的是与原始句柄相同的对象。因此,对对象的任何更改都会通过两个句柄反映出来。例如,如果您复制一个文件句柄,则两个句柄的当前文件位置始终相同。

*有关更多信息,请参阅此答案assert_spawning

于 2014-10-07T17:06:01.833 回答
1

子进程在其关闭中没有队列。它的队列实例引用了不同的内存区域。当以您想要的方式使用队列时,您必须将它们作为参数传递给函数。我喜欢的一个解决方案是用functools.partial你想要的队列来curry你的函数,将它们永久添加到它的闭包中,并让你启动多个线程以使用相同的 IPC 通道执行相同的任务。

于 2014-10-07T17:44:22.180 回答
-1

子进程没有得到复制的地址空间。孩子是一个完全独立的 python 进程,没有任何共享。是的,您必须将队列传递给孩子。当您这样做时,多处理会自动处理通过 IPC 进行的共享。请参阅https://docs.python.org/2/library/multiprocessing.html#exchangeing-objects-between-processes

于 2014-10-06T22:26:51.940 回答