因此,鉴于此未经测试的建议代码;如果池中有 4 个进程,是否每个进程都分配了 25 件事情要做,或者 100 件事情被寻找要做的事情的进程一一挑选出来,这样每个进程可能会做不同数量的事情,例如 30 , 26, 24, 20。
好吧,显而易见的答案是测试它。
照原样,测试可能不会告诉您太多,因为作业将尽快完成,并且即使池化进程在它们准备好时抢占作业,事情也可能最终均匀分布。但是有一个简单的方法可以解决这个问题:
import collections
import multiprocessing
import os
import random
import time
def generate_stuff():
for foo in range(100):
yield foo
def process(moo):
#print moo
time.sleep(random.randint(0, 50) / 10.)
return os.getpid()
pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)
如果数字是“参差不齐的”,那么您要么知道池中的进程必须在准备好新作业的情况下获取新作业。(我明确设置chunksize
为 1 以确保块不会太大,以至于每个块首先只得到一个块。)
当我在 8 核机器上运行它时:
Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})
因此,看起来这些流程正在快速获得新的工作。
由于您特别询问了 4 名工人,我改为Pool()
并Pool(4)
得到了这个:
Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})
但是,有一种比测试更好的方法来找出答案:阅读源代码。
如您所见,map
只需调用map_async
,它会创建一堆批次并将它们放在一个self._taskqueue
对象(一个Queue.Queue
实例)上。如果您进一步阅读,这个队列不会直接与其他进程共享,但是有一个池管理器线程,每当一个进程完成并返回结果时,它就会从队列中弹出下一个作业并将其提交回进程。
这也是您可以找出默认块大小的方法map
。上面链接的 2.7 实现表明它只是len(iterable) / (len(self._pool) * 4)
四舍五入(比避免小数算术稍微详细一点)——或者,换句话说,对于每个进程大约 4 个块来说足够大。但是你真的不应该依赖这个;该文档含糊地和间接地暗示它将使用某种启发式方法,但并没有给您任何关于那将是什么的保证。因此,如果您确实需要“每个进程大约 4 个块”,请明确计算。更实际地,如果您需要除默认值之外的任何东西,您可能需要一个特定于域的值来计算(通过计算、猜测或分析)。