10

在我的应用程序中,我使用来自多处理模块的管道在 python 进程之间进行通信。最近我观察到一种奇怪的行为,具体取决于我通过它们发送的数据的大小。根据 python 文档,这些管道基于连接并且应该以异步方式运行,但有时它们会在发送时卡住。如果我在每个连接中启用全双工,一切正常,即使我没有使用连接来发送和收听。谁能解释这种行为?

  1. 100 个浮点数,全双工禁用
    代码工作,利用异步性。
  2. 100 个浮点数,启用全双工
    该示例按预期工作正常。
  3. 10000 个浮点数,全双工禁用
    即使处理较小的数据也没问题,执行被永远阻塞。
  4. 10000 个浮点数,全双工
    再次启用 Fine。

代码(这不是我的生产代码,它只是说明了我的意思):

from collections import deque
from multiprocessing import Process, Pipe
from numpy.random import randn
from os import getpid

PROC_NR = 4
DATA_POINTS = 100
# DATA_POINTS = 10000


def arg_passer(pipe_in, pipe_out, list_):
    my_pid = getpid()
    print "{}: Before send".format(my_pid)
    pipe_out.send(list_)
    print "{}: After send, before recv".format(my_pid)
    buf = pipe_in.recv()
    print "{}: After recv".format(my_pid)


if __name__ == "__main__":
    pipes = [Pipe(False) for _ in range(PROC_NR)]
    # pipes = [Pipe(True) for _ in range(PROC_NR)]
    pipes_in = deque(p[0] for p in pipes)
    pipes_out = deque(p[1] for p in pipes)
    pipes_in.rotate(1)
    pipes_out.rotate(-1)

    data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)]
    processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo]))
                 for foo in xrange(PROC_NR)]

    for proc in processes:
        proc.start()

    for proc in processes:
        proc.join()
4

1 回答 1

14

首先,值得注意的是multiprocessing.Pipe类的实现......

def Pipe(duplex=True):
    '''
    Returns pair of connection objects at either end of a pipe
    '''
    if duplex:
        s1, s2 = socket.socketpair()
        s1.setblocking(True)
        s2.setblocking(True)
        c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
        c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
        s1.close()
        s2.close()
    else:
        fd1, fd2 = os.pipe()
        c1 = _multiprocessing.Connection(fd1, writable=False)
        c2 = _multiprocessing.Connection(fd2, readable=False)

    return c1, c2

不同之处在于半双工“管道”使用匿名管道,但全双工“管道”实际上使用Unix 域套接字,因为匿名管道本质上是单向的。

我不确定在这种情况下您所说的“异步”一词是什么意思。如果您的意思是“非阻塞 I/O”,那么值得注意的是两种实现都默认使用阻塞 I/O。


其次,值得注意的是您尝试发送的数据的腌制大小......

>>> from numpy.random import randn
>>> from cPickle import dumps
>>> len(dumps(randn(100)))
2479
>>> len(dumps(randn(10000)))
237154

第三,从pipe(7)手册页...

管道容量

管道的容量有限。如果管道已满,则 write(2) 将阻塞或失败,具体取决于是否设置了 O_NONBLOCK 标志(见下文)。不同的实现对管道容量有不同的限制。应用程序不应该依赖于特定的容量:应用程序的设计应该使得读取进程在数据可用时立即使用数据,以便写入进程不会保持阻塞状态。

在 2.6.11 之前的 Linux 版本中,管道的容量与系统页面大小相同(例如,在 i386 上为 4096 字节)。从 Linux 2.6.11 开始,管道容量为 65536 字节。


So, in effect, you've created a deadlock where all the subprocesses have blocked on the pipe_out.send() call, and none of them can receive any data from the other processes, because you're sending all 237,154 bytes of data in one hit, which has filled the 65,536 byte buffer.

You might be tempted just to use the Unix domain socket version, but the only reason it works at present is that it has a larger buffer size than a pipe, and you'll find that solution will also fail if you increase the number of DATA_POINTS to 100,000.

The "quick n' dirty hack" solution is to break the data into smaller chunks for sending, but it's not good practice to rely on the buffers being a specific size.

更好的解决方案是在调用中使用非阻塞 I/O pipe_out.send(),尽管我对该模块不够熟悉,multiprocessing无法确定使用该模块实现它的最佳方法。

伪代码将沿着......

while 1:
    if we have sent all data and received all data:
        break
    send as much data as we can without blocking
    receive as much data as we can without blocking
    if we didn't send or receive anything in this iteration:
        sleep for a bit so we don't waste CPU time
        continue

...或者您可以使用 Pythonselect模块来避免睡眠时间超过必要的时间,但是,再次集成它multiprocessing.Pipe可能会很棘手。

课程可能会multiprocessing.Queue为你完成所有这些,但我以前从未使用过它,所以你必须做一些实验。

于 2013-04-24T12:59:22.567 回答