5

我最近发现,如果我们使用创建一对父子连接对象multiprocessing.Pipe,并且如果obj我们试图通过管道发送的对象太大,我的程序会挂起而不会抛出异常或做任何事情. 请参阅下面的代码。(下面的代码使用numpy包来生成大量的浮点数。)

import multiprocessing as mp
import numpy as np

def big_array(conn, size=1200):
    a = np.random.rand(size)
    print "Child process trying to send array of %d floats." %size
    conn.send(a)
    return a

if __name__ == "__main__":
    print "Main process started."
    parent_conn, child_conn = mp.Pipe()
    proc = mp.Process(target=big_array, args=[child_conn, 1200])
    proc.start()
    print "Child process started."
    proc.join()
    print "Child process joined."
    a = parent_conn.recv()
    print "Received the following object."
    print "Type: %s. Size: %d." %(type(a), len(a))

输出如下。

Main process started.
Child process started.
Child process trying to send array of 1200 floats.

它无限期地挂在这里。但是,如果我们尝试发送一个包含 1000 个浮点数的数组而不是 1200,那么程序将成功执行,并按预期输出以下输出。

Main process started.
Child process started.
Child process trying to send array of 1000 floats.
Child process joined.
Received the following object.
Type: <type 'numpy.ndarray'>. Size: 1000.
Press any key to continue . . .

这对我来说似乎是一个错误。该文档说明了以下内容。

send(obj) 将对象发送到连接的另一端,应使用 recv() 读取该对象。

对象必须是可腌制的。非常大的泡菜(大约 32 MB+,尽管取决于操作系统)可能会引发 ValueError 异常。

但是在我的运行中,甚至没有ValueError抛出异常,程序只是挂在那里。而且,1200长的numpy数组有9600字节大,肯定不超过32MB!这看起来像一个错误。有谁知道如何解决这个问题?

顺便说一句,我使用的是 64 位的 Windows 7。

4

1 回答 1

16

尝试移动join()到下方recv()

import multiprocessing as mp

def big_array(conn, size=1200):
    a = "a" * size
    print "Child process trying to send array of %d floats." %size
    conn.send(a)
    return a

if __name__ == "__main__":
    print "Main process started."
    parent_conn, child_conn = mp.Pipe()
    proc = mp.Process(target=big_array, args=[child_conn, 120000])
    proc.start()
    print "Child process started."
    print "Child process joined."
    a = parent_conn.recv()
    proc.join()
    print "Received the following object."
    print "Type: %s. Size: %d." %(type(a), len(a))

但是我真的不明白为什么您的示例即使对于小尺寸也有效。我在想写入管道然后在不先从管道读取数据的情况下使进程加入会阻止连接。您应该首先从管道接收,然后加入。但显然它不会阻止小尺寸......?

编辑:来自文档(http://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming):

“一个会死锁的例子如下:”

from multiprocessing import Process, Queue

def f(q):
q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()
于 2013-03-30T08:17:08.477 回答