107

multiprocessing模块的文档显示了如何将队列传递给以multiprocessing.Process. 但是如何与以 开头的异步工作进程共享队列apply_async?我不需要动态加入或其他任何东西,只是工人(反复)将他们的结果报告回基地的一种方式。

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    q = multiprocessing.Queue()
    workers = pool.apply_async(worker, (33, q))

这失败了: RuntimeError: Queue objects should only be shared between processes through inheritance。我理解这意味着什么,并且我理解继承而不是要求酸洗/解酸(以及所有特殊的 Windows 限制)的建议。但是我如何一种有效的方式通过队列呢?我找不到一个例子,我尝试了几种以各种方式失败的替代方案。请帮忙?

4

2 回答 2

151

尝试使用multiprocessing.Manager来管理您的队列并使其可供不同的工作人员访问。

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue()
    workers = pool.apply_async(worker, (33, q))
于 2012-03-29T15:08:42.947 回答
19

multiprocessing.Pool已经有一个共享的结果队列,不需要额外涉及一个Manager.Queue. Manager.Queuequeue.Queue引擎盖下的(多线程队列),位于单独的服务器进程上并通过代理公开。与 Pool 的内部队列相比,这增加了额外的开销。与依赖 Pool 的本机结果处理相反,结果Manager.Queue也不能保证是有序的。

工作进程没有启动.apply_async(),这在您实例化时已经发生Pool。当你打电话时开始pool.apply_async()是一个新的“工作”。Pool 的工作进程在后台运行multiprocessing.pool.worker-function。该函数负责处理通过 Pool 内部传输的新“任务” Pool._inqueue,并将结果通过Pool._outqueue. 您指定的func将在multiprocessing.pool.worker. func只需要return一些东西,结果将自动发送回父级。

.apply_async() 立即(异步)返回一个AsyncResult对象(别名ApplyResult)。您需要.get()在该对象上调用(正在阻塞)以接收实际结果。另一种选择是注册一个回调函数,一旦结果准备好就会触发该函数。

from multiprocessing import Pool

def busy_foo(i):
    """Dummy function simulating cpu-bound work."""
    for _ in range(int(10e6)):  # do stuff
        pass
    return i

if __name__ == '__main__':

    with Pool(4) as pool:
        print(pool._outqueue)  # DEMO
        results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
        # `.apply_async()` immediately returns AsyncResult (ApplyResult) object
        print(results[0])  # DEMO
        results = [res.get() for res in results]
        print(f'result: {results}')       

示例输出:

<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

注意:指定timeout-parameter for.get()不会停止 worker 中任务的实际处理,它只会通过提高 a 来解除等待的父级multiprocessing.TimeoutError

于 2019-04-08T16:23:55.863 回答