multiprocessing.Pool
已经有一个共享的结果队列,不需要额外涉及一个Manager.Queue
. Manager.Queue
是queue.Queue
引擎盖下的(多线程队列),位于单独的服务器进程上并通过代理公开。与 Pool 的内部队列相比,这增加了额外的开销。与依赖 Pool 的本机结果处理相反,结果Manager.Queue
也不能保证是有序的。
工作进程没有启动.apply_async()
,这在您实例化时已经发生Pool
。当你打电话时开始的pool.apply_async()
是一个新的“工作”。Pool 的工作进程在后台运行multiprocessing.pool.worker
-function。该函数负责处理通过 Pool 内部传输的新“任务” Pool._inqueue
,并将结果通过Pool._outqueue
. 您指定的func
将在multiprocessing.pool.worker
. func
只需要return
一些东西,结果将自动发送回父级。
.apply_async()
立即(异步)返回一个AsyncResult
对象(别名ApplyResult
)。您需要.get()
在该对象上调用(正在阻塞)以接收实际结果。另一种选择是注册一个回调函数,一旦结果准备好就会触发该函数。
from multiprocessing import Pool
def busy_foo(i):
"""Dummy function simulating cpu-bound work."""
for _ in range(int(10e6)): # do stuff
pass
return i
if __name__ == '__main__':
with Pool(4) as pool:
print(pool._outqueue) # DEMO
results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
# `.apply_async()` immediately returns AsyncResult (ApplyResult) object
print(results[0]) # DEMO
results = [res.get() for res in results]
print(f'result: {results}')
示例输出:
<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
注意:指定timeout
-parameter for.get()
不会停止 worker 中任务的实际处理,它只会通过提高 a 来解除等待的父级multiprocessing.TimeoutError
。