20

How can I make multiprocessing.pool.map distribute processes in numerical order?


More Info:
I have a program which processes a few thousand data files, making a plot of each one. I'm using a multiprocessing.pool.map to distribute each file to a processor and it works great. Sometimes this takes a long time, and it would be nice to look at the output images as the program is running. This would be a lot easier if the map process distributed the snapshots in order; instead, for the particular run I just executed, the first 8 snapshots analyzed were: 0, 78, 156, 234, 312, 390, 468, 546. Is there a way to make it distribute them more closely to in numerical order?


Example:
Here's a sample code which contains the same key elements, and show's the same basic result:

import sys
from multiprocessing import Pool
import time

num_proc  = 4; num_calls = 20; sleeper   = 0.1

def SomeFunc(arg):
    time.sleep(sleeper)
    print "%5d" % (arg),
    sys.stdout.flush()     # otherwise doesn't print properly on single line

proc_pool = Pool(num_proc)
proc_pool.map( SomeFunc, range(num_calls) )

Yields:

   0  4  2  6   1   5   3   7   8  10  12  14  13  11   9  15  16  18  17  19

Answer:

From @Hayden: Use the 'chunksize' parameter, def map(self, func, iterable, chunksize=None).

More Info:
The chunksize determines how many iterations are allocated to each processor at a time. My example above, for instance, uses a chunksize of 2---which means that each processor goes off and does its thing for 2 iterations of the function, then comes back for more ('check-in'). The trade-off behind chunksize is that there is overhead for the 'check-in' when the processor has to sync up with the others---suggesting you want a large chunksize. On the other hand, if you have large chunks, then one processor might finish its chunk while another-one has a long time left to go---so you should use a small chunksize. I guess the additional useful information is how much range there is, in how long each function call can take. If they really should all take the same amount of time - it's way more efficient to use a large chunk size. On the other hand, if some function calls could take twice as long as others, you want a small chunksize so that processors aren't caught waiting.

For my problem, every function call should take very close to the same amount of time (I think), so if I want the processes to be called in order, I'm going to sacrifice efficiency because of the check-in overhead.

4

2 回答 2

19

发生这种情况的原因是因为每个进程在调用 map 时都被赋予了预定义的工作量,这取决于chunksize. 我们可以chunksize通过查看pool.map的源来计算出默认值

chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
  chunksize += 1

因此,对于 20 个范围和 4 个进程,我们将得到chunksize2 个。

如果我们修改您的代码以反映这一点,我们应该得到与您现在得到的结果相似的结果:

proc_pool.map(SomeFunc, range(num_calls), chunksize=2)

这会产生输出:

0 2 6 4 1 7 5 3 8 10 12 14 9 13 15 11 16 18 17 19

现在,设置chunksize=1将确保池中的每个进程一次只分配一个任务。

proc_pool.map(SomeFunc, range(num_calls), chunksize=1)

与未指定块大小时相比,这应该确保相当好的数字排序。例如,块大小为 1 会产生输出:

0 1 2 3 4 5 6 7 9 10 8 11 13 12 15 14 16 17 19 18

于 2013-07-28T01:32:14.030 回答
1

怎么map改成imap

import os
from multiprocessing import Pool
import time

num_proc = 4
num_calls = 20
sleeper = 0.1

def SomeFunc(arg):
    time.sleep(sleeper)
    print "%s %5d" % (os.getpid(), arg)
    return arg

proc_pool = Pool(num_proc)
list(proc_pool.imap(SomeFunc, range(num_calls)))

原因可能是默认chunksizeimap1,所以它可能不会运行到map.

于 2013-07-28T00:04:40.350 回答