4

以下脚本生成 100 个大小为 100000 的随机字典,将每个 (key, value) 元组馈送到队列中,同时一个单独的进程从队列中读取:

import multiprocessing as mp

import numpy.random as nr


def get_random_dict(_dummy):
    return dict((k, v) for k, v in enumerate(nr.randint(pow(10, 9), pow(10, 10), pow(10, 5))))

def consumer(q):
    for (k, v) in iter(q.get, 'STOP'):
        pass

q = mp.Queue()
p = mp.Process(target=consumer, args=(q,))
p.start()
for d in mp.Pool(1).imap_unordered(get_random_dict, xrange(100)):
    for k, v in d.iteritems():
        q.put((k, v))
q.put('STOP')
p.join()

我期望内存使用量保持不变,因为消费者进程在主进程提供数据时从队列中提取数据。我验证了数据不会在队列中累积。

但是,我监控了内存消耗,并且随着脚本的运行它不断增加。如果我替换imap_unorderedfor _ in xrange(100): d = get_random_dict(),那么内存消耗是恒定的。解释是什么?

4

2 回答 2

1

我认为主要问题是multiprocessing.Pool用于收集在一个进程(Pool进程)中创建的字典,然后将它们放入主进程的队列中。我认为(我可能错了)它Pool会创建一些自己的队列,而这些可能是数据积累的队列。

如果您像这样放置一些调试打印,您可以清楚地看到这一点:

...
def get_random_dict(_dummy):
    print 'generating dict'
    ...
...
for d in mp.Pool(1).imap_unordered(get_random_dict, xrange(100)):
    print 'next d'
    ...

然后你会看到这样的东西:

generating dict
generating dict
next d
generating dict
generating dict
generating dict
generating dict
generating dict
next d
...

dict这清楚地表明您在某处(可能在 的内管中)积累了那些生成的 s Pool

我认为更好的解决方案是将数据 get_random_dict直接放入队列并放弃*map使用Pool.

于 2014-11-07T15:25:48.123 回答
1

Pool.imap并不完全等同于imap. 它的相同之处在于它可以像这样使用imap并且它返回一个迭代器。但是,实现方式完全不同。后备池将尽其所能尽快完成分配给它的所有工作,无论迭代器的消耗速度有多快。如果您只想在请求时处理作业,那么使用multiprocessing. 不妨使用itertools.imap并完成它。

因此,您的内存消耗增加的原因是因为池创建字典的速度比您的消费者进程消耗它们的速度快。这是因为池从工作进程检索结果的方式是单向的(一个进程写入和进程读取),因此不需要显式同步机制。而 aQueue是双向的——两个进程都可以读取和写入队列。这意味着需要在使用队列的进程之间进行显式同步,以确保它们不会竞争将下一个项目添加到队列或从队列中删除项目(从而使队列处于不一致状态)。

于 2014-11-07T15:53:46.633 回答