10

我创建了 100 个子进程

proc_list = [
    Process(target = simulator, args=(result_queue,))
    for i in xrange(100)]

并启动它们

for proc in proc_list: proc.start()

每个进程在进行一些处理后放入 result_queue(multiprocessing.Queue 的实例)10000 个元组。

def simulate(alg_instance, image_ids, gamma, results,
                     simulations, sim_semaphore):
  (rs, qs, t_us) =  alg_instance.simulate_multiple(image_ids, gamma,
                                             simulations)
  all_tuples = zip(rs, qs, t_us)
  for result in all_tuples:
    results.put(result)
  sim_semaphore.release()

我应该(?)在队列中获得 1000000 个元组,但经过各种运行后,我得到了这些(样本)大小:14912 19563 12952 13524 7487 18350 15986 11928 14281 14282 7317

有什么建议么?

4

3 回答 3

22

我对多处理问题的解决方案几乎总是使用 Manager 对象。虽然暴露的接口是相同的,但底层实现要简单得多,并且错误更少。

from multiprocessing import Manager
manager = Manager()
result_queue = manager.Queue()

尝试一下,看看它是否不能解决您的问题。

于 2012-07-11T23:33:07.313 回答
7

multiprocessing.Queue 在其文档中被称为是线程安全的。但是当你使用 Queue 进行进程间通信时,它应该与 multiprocessing.Manager().Queue() 一起使用

于 2016-09-25T00:46:39.763 回答
1

OP帖子中没有任何证据表明multiprocessing.Queue它不起作用。OP 发布的代码根本不足以理解发生了什么:他们加入了所有流程吗?他们是否正确地将队列传递给子进程(如果它在 Windows 上,则必须作为参数)?他们的子进程是否验证他们实际上得到了 10000 个元组?等等

有可能 OP 确实遇到了一个难以重现的错误mp.Queue,但考虑到 CPython 的测试量,以及我刚刚运行 100 个进程 x 10000 个结果而没有任何问题的事实,我怀疑 OP 实际上他们自己的代码有问题。

是的,Manager().Queue()在其他答案中提到的是一种非常好的共享数据的方式,但是没有理由避免multiprocessing.Queue()基于未经证实的“它有问题”的报告。

于 2017-03-12T18:13:18.060 回答