21

我正在使用multiprocessing.imap_unordered对值列表执行计算:

def process_parallel(fnc, some_list):
    pool = multiprocessing.Pool()
    for result in pool.imap_unordered(fnc, some_list):
        for x in result:
            yield x
    pool.terminate()

根据设计,每次调用fnc都会返回一个 HUGE 对象。我可以将此类对象的 N 个实例存储在 RAM 中,其中 N ~ cpu_count,但不多(不是数百个)。

现在,使用这个函数会占用太多内存。内存完全用在主进程中,而不是在工人中。

如何imap_unordered存储完成的结果?我的意思是工作人员已经返回但尚未传递给用户的结果。我认为它很聪明,只根据需要“懒惰地”计算它们,但显然不是。

看起来因为我不能process_parallel足够快地消耗结果,池不断地从fnc某个地方排队这些巨大的对象,内部,然后爆炸。有没有办法避免这种情况?以某种方式限制其内部队列?


我正在使用 Python2.7。干杯。

4

2 回答 2

12

通过查看相应的源文件 ( ) 可以看到python2.7/multiprocessing/pool.py,IMapUnorderedIterator 使用一个collections.deque实例来存储结果。如果有一个新项目进来,它会在迭代中添加和删除。

正如您所建议的,如果在主线程仍在处理该对象时另一个巨大的对象进入,那么这些对象也将存储在内存中。

你可能会尝试是这样的:

it = pool.imap_unordered(fnc, some_list)
for result in it:
    it._cond.acquire()
    for x in result:
        yield x
    it._cond.release()

如果它试图将下一个对象放入双端队列,这应该会导致任务结果接收器线程在处理项目时被阻塞。因此,内存中的巨大对象不应超过两个。如果这适用于您的情况,我不知道;)

于 2012-06-27T10:14:58.273 回答
2

我能想到的最简单的解决方案是添加一个闭包来包装您的fnc函数,该函数将使用信号量来控制一次可以执行的同时作业执行的总数(我假设主进程/线程将增加信号量)。可以根据作业大小和可用内存计算信号量值。

于 2012-06-29T00:01:10.700 回答