24

我正在使用multiprocessing.Pool()并行化一些繁重的计算。

目标函数返回大量数据(一个巨大的列表)。我的内存快用完了。

如果没有multiprocessing,我只需将目标函数更改为生成器,通过yield一个接一个地生成结果元素,因为它们是计算出来的。

我知道多处理不支持生成器——它等待整个输出并立即返回,对吗?没有屈服。有没有办法让Pool工作人员在数据可用时立即生成数据,而无需在 RAM 中构建整个结果数组?

简单的例子:

def target_fnc(arg):
   result = []
   for i in xrange(1000000):
       result.append('dvsdbdfbngd') # <== would like to just use yield!
   return result

def process_args(some_args):
    pool = Pool(16)
    for result in pool.imap_unordered(target_fnc, some_args):
        for element in result:
            yield element

这是 Python 2.7。

4

3 回答 3

17

这听起来像是队列的理想用例:http: //docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes

只需将您的结果从池中的工作人员中输入队列,然后将它们摄取到主服务器中。

请注意,除非您排空队列的速度几乎与工作人员填充队列的速度一样快,否则您仍然可能会遇到内存压力问题。您可以限制队列大小(队列中可以容纳的最大对象数),在这种情况下,池中的工作人员将阻塞queue.put语句,直到队列中有可用空间。这将对内存使用设置上限。 如果你这样做,可能是时候重新考虑是否需要池化和/或使用更少的工人是否有意义。

于 2013-02-03T21:21:07.723 回答
4

根据您的描述,听起来您对处理传入的数据并没有那么感兴趣,而是避免将一百万个元素list传回。

有一种更简单的方法:只需将数据放入文件中即可。例如:

def target_fnc(arg):
    fd, path = tempfile.mkstemp(text=True)
    with os.fdopen(fd) as f:
        for i in xrange(1000000):
            f.write('dvsdbdfbngd\n')
    return path

def process_args(some_args):
    pool = Pool(16)
    for result in pool.imap_unordered(target_fnc, some_args):
        with open(result) as f:
            for element in f:
                yield element

显然,如果您的结果可以包含换行符,或者不是字符串等,您将希望使用csv文件 anumpy等而不是简单的文本文件,但想法是相同的。

话虽如此,即使这更简单,一次处理数据块通常也有好处,因此Queue如果有缺点(分别,需要一种方法来分解任务,或者必须能够像生成数据一样快地使用数据)不是交易破坏者。

于 2013-02-03T22:46:28.430 回答
3

如果你的任务可以以块的形式返回数据……它们可以分解成更小的任务,每个任务都返回一个块吗?显然,这并不总是可能的。如果不是,则必须使用其他一些机制(如QueueLoren Abrams 建议的 a )。但是当它时,由于其他原因,它可能是一个更好的解决方案,以及解决这个问题。

以你的例子,这当然是可行的。例如:

def target_fnc(arg, low, high):
   result = []
   for i in xrange(low, high):
       result.append('dvsdbdfbngd') # <== would like to just use yield!
   return result

def process_args(some_args):
    pool = Pool(16)
    pool_args = []
    for low in in range(0, 1000000, 10000):
        pool_args.extend(args + [low, low+10000] for args in some_args)
    for result in pool.imap_unordered(target_fnc, pool_args):
        for element in result:
            yield element

zip(如果您愿意,您当然可以用嵌套理解或and替换循环flatten。)

所以,如果some_args[1, 2, 3],你会得到 300 个任务——<code>[[1, 0, 10000], [2, 0, 10000], [3, 0, 10000], [1, 10000, 20000], ...] , 每个只返回 10000 个元素而不是 1000000。

于 2013-02-03T22:33:24.693 回答