23

当您map对 amultiprocessing.Pool进行迭代时,是否在开始时将迭代划分为池中每个进程的队列,或者是否有一个公共队列,当进程空闲时从该队列中获取任务?

    def generate_stuff():
        for foo in range(100):
             yield foo

    def process(moo):
        print moo

    pool = multiprocessing.Pool()
    pool.map(func=process, iterable=generate_stuff())
    pool.close()

因此,鉴于此未经测试的建议代码;如果池中有 4 个进程,是否每个进程都分配了 25 件事情要做,或者 100 件事情被寻找要做的事情的进程一一挑选出来,这样每个进程可能会做不同数量的事情,例如 30 , 26, 24, 20。

4

3 回答 3

27

因此,鉴于此未经测试的建议代码;如果池中有 4 个进程,是否每个进程都分配了 25 件事情要做,或者 100 件事情被寻找要做的事情的进程一一挑选出来,这样每个进程可能会做不同数量的事情,例如 30 , 26, 24, 20。

好吧,显而易见的答案是测试它。

照原样,测试可能不会告诉您太多,因为作业将尽快完成,并且即使池化进程在它们准备好时抢占作业,事情也可能最终均匀分布。但是有一个简单的方法可以解决这个问题:

import collections
import multiprocessing
import os
import random
import time

def generate_stuff():
    for foo in range(100):
        yield foo

def process(moo):
    #print moo
    time.sleep(random.randint(0, 50) / 10.)
    return os.getpid()

pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)

如果数字是“参差不齐的”,那么您要么知道池中的进程必须在准备好新作业的情况下获取新作业。(我明确设置chunksize为 1 以确保块不会太大,以至于每个块首先只得到一个块。)

当我在 8 核机器上运行它时:

Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})

因此,看起来这些流程正在快速获得新的工作。

由于您特别询问了 4 名工人,我改为Pool()Pool(4)得到了这个:

Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})

但是,有一种比测试更好的方法来找出答案:阅读源代码

如您所见,map只需调用map_async,它会创建一堆批次并将它们放在一个self._taskqueue对象(一个Queue.Queue实例)上。如果您进一步阅读,这个队列不会直接与其他进程共享,但是有一个池管理器线程,每当一个进程完成并返回结果时,它就会从队列中弹出下一个作业并将其提交回进程。

这也是您可以找出默认块大小的方法map。上面链接的 2.7 实现表明它只是len(iterable) / (len(self._pool) * 4)四舍五入(比避免小数算术稍微详细一点)——或者,换句话说,对于每个进程大约 4 个块来说足够大。但是你真的不应该依赖这个;该文档含糊地和间接地暗示它将使用某种启发式方法,但并没有给您任何关于那将是什么的保证。因此,如果您确实需要“每个进程大约 4 个块”,请明确计算。更实际地,如果您需要除默认值之外的任何东西,您可能需要一个特定于域的值来计算(通过计算、猜测或分析)。

于 2012-11-07T07:14:55.303 回答
3

http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.map

map(func, iterable[, chunksize])

此方法将可迭代对象分割成多个块,将它们作为单独的任务提交给进程池。这些块的(近似)大小可以通过将 chunksize 设置为正整数来指定。

我假设一个进程在完成前一个块时从队列中拾取下一个块。

默认值chunksize取决于长度,iterable并且选择了这样块的数量大约是进程数量的四倍。(来源)

于 2012-11-07T07:02:17.513 回答
1

chunksize要在不查看其multiprocessing模块源代码的情况下估计Python 实现使用的情况,请运行:

#!/usr/bin/env python
import multiprocessing as mp
from itertools import groupby

def work(index):
    mp.get_logger().info(index)
    return index, mp.current_process().name

if __name__ == "__main__":
    import logging
    import sys
    logger = mp.log_to_stderr()

    # process cmdline args
    try:
        sys.argv.remove('--verbose')
    except ValueError:
        pass  # not verbose
    else:
        logger.setLevel(logging.INFO)  # verbose
    nprocesses, nitems = int(sys.argv.pop(1)), int(sys.argv.pop(1))
    # choices: 'map', 'imap', 'imap_unordered'
    map_name = sys.argv[1] if len(sys.argv) > 1 else 'map'
    kwargs = dict(chunksize=int(sys.argv[2])) if len(sys.argv) > 2 else {}

    # estimate chunksize used
    max_chunksize = 0
    map_func = getattr(mp.Pool(nprocesses), map_name)
    for _, group in groupby(sorted(map_func(work, range(nitems), **kwargs),
                                   key=lambda x: x[0]),  # sort by index
                            key=lambda x: x[1]):  # group by process name
        max_chunksize = max(max_chunksize, len(list(group)))
    print("%s: max_chunksize %d" % (map_name, max_chunksize))

它表明 ,imap默认使用和imap_unorderedfor取决于, (每个进程的块数不固定)并且取决于 python 版本。如果指定了参数,所有函数都会考虑参数。chunksize=1max_chunksizemapnprocessesnitemmax_chunksize*map*chunksize

用法

$ ./estimate_chunksize.py nprocesses nitems [map_name [chunksize]] [--verbose]

了解个人工作是如何分配的;指定--verbose参数。

于 2012-11-07T08:51:07.387 回答