在我的应用程序中,我使用来自多处理模块的管道在 python 进程之间进行通信。最近我观察到一种奇怪的行为,具体取决于我通过它们发送的数据的大小。根据 python 文档,这些管道基于连接并且应该以异步方式运行,但有时它们会在发送时卡住。如果我在每个连接中启用全双工,一切正常,即使我没有使用连接来发送和收听。谁能解释这种行为?
- 100 个浮点数,全双工禁用
代码工作,利用异步性。 - 100 个浮点数,启用全双工
该示例按预期工作正常。 - 10000 个浮点数,全双工禁用
即使处理较小的数据也没问题,执行被永远阻塞。 - 10000 个浮点数,全双工
再次启用 Fine。
代码(这不是我的生产代码,它只是说明了我的意思):
from collections import deque
from multiprocessing import Process, Pipe
from numpy.random import randn
from os import getpid
PROC_NR = 4
DATA_POINTS = 100
# DATA_POINTS = 10000
def arg_passer(pipe_in, pipe_out, list_):
my_pid = getpid()
print "{}: Before send".format(my_pid)
pipe_out.send(list_)
print "{}: After send, before recv".format(my_pid)
buf = pipe_in.recv()
print "{}: After recv".format(my_pid)
if __name__ == "__main__":
pipes = [Pipe(False) for _ in range(PROC_NR)]
# pipes = [Pipe(True) for _ in range(PROC_NR)]
pipes_in = deque(p[0] for p in pipes)
pipes_out = deque(p[1] for p in pipes)
pipes_in.rotate(1)
pipes_out.rotate(-1)
data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)]
processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo]))
for foo in xrange(PROC_NR)]
for proc in processes:
proc.start()
for proc in processes:
proc.join()