235

I'm trying to learn how to use Python's multiprocessing package, but I don't understand the difference between map_async and imap. I noticed that both map_async and imap are executed asynchronously. So when should I use one over the other? And how should I retrieve the result returned by map_async?

Should I use something like this?

def test():
    result = pool.map_async()
    pool.close()
    pool.join()
    return result.get()

result=test()
for i in result:
    print i
4

2 回答 2

620

imap/imap_unorderedmap/之间有两个主要区别map_async

  1. 他们使用您传递给他们的可迭代对象的方式。
  2. 他们将结果返回给您的方式。

map通过将可迭代对象转换为列表(假设它还不是列表)、将其分成块并将这些块发送到Pool. 将 iterable 分成块比在进程之间一次传递一个 item 的 iterable 中的每个项目执行得更好——尤其是在 iterable 很大的情况下。但是,将可迭代对象转换为列表以对其进行分块可能会产生非常高的内存成本,因为整个列表都需要保存在内存中。

imap不会将您提供的可迭代对象变成列表,也不会将其分成块(默认情况下)。它将一次迭代一个可迭代的元素,并将它们每个发送到一个工作进程。这意味着您不会将整个可迭代对象转换为列表而对内存造成影响,但这也意味着大型可迭代对象的性能较慢,因为缺少分块。然而,这可以通过传递一个chunksize大于默认值 1 的参数来缓解。

imap/imap_unorderedmap/之间的另一个主要区别map_async是,使用imap/ imap_unordered,您可以在工作人员准备好后立即开始接收结果,而不必等待所有工作人员完成。使用map_async, anAsyncResult会立即返回,但在所有结果都被处理之前,您实际上无法从该对象中检索结果,此时它返回的列表map与(map实际上在内部实现为map_async(...).get())相同。没有办法得到部分结果;您要么拥有全部结果,要么一无所有。

imap并且imap_unordered都立即返回可迭代对象。使用imap,结果将在准备好后立即从可迭代中产生,同时仍保留输入可迭代的顺序。使用imap_unordered,结果将在它们准备好后立即产生,而不管输入可迭代的顺序如何。所以,假设你有这个:

import multiprocessing
import time

def func(x):
    time.sleep(x)
    return x + 2

if __name__ == "__main__":    
    p = multiprocessing.Pool()
    start = time.time()
    for x in p.imap(func, [1,5,3]):
        print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))

这将输出:

3 (Time elapsed: 1s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

如果你使用p.imap_unordered而不是p.imap,你会看到:

3 (Time elapsed: 1s)
5 (Time elapsed: 3s)
7 (Time elapsed: 5s)

如果你使用p.mapor p.map_async().get(),你会看到:

3 (Time elapsed: 5s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

imap因此,使用/ imap_unorderedover的主要原因map_async是:

  1. 您的可迭代对象足够大,以至于将其转换为列表会导致您耗尽/使用太多内存。
  2. 您希望能够在所有结果完成之前开始处理结果。
于 2014-10-23T04:51:41.540 回答
10

接受的答案指出,imap_unordered“一旦准备好,就会产生结果”,人们可能会推断出结果将按完成的顺序返回。但我只想明确指出,这通常不是真的。该文档指出,结果以任意顺序返回。考虑以下程序,它使用 4 的池大小、20的可迭代大小和 5 的块大小值。worker函数根据其传递的参数休眠可变的时间,这也确保池中没有任何进程抓取所有提交的任务。因此,我希望池中的每个进程都有20 / 4 = 5要处理的任务:

from multiprocessing import Pool
import time

def worker(x):
    print(f'x = {x}', flush=True)
    time.sleep(.1 * (20 - x))
    # return approximate completion time with passed argument:
    return time.time(), x

if __name__ == '__main__':
    pool = Pool(4)
    results = pool.imap_unordered(worker, range(20), chunksize=5)
    for t, x in results:
        print('result:', t, x)

印刷:

x = 0
x = 5
x = 10
x = 15
x = 16
x = 17
x = 11
x = 18
x = 19
x = 6
result: 1621512513.7737606 15
result: 1621512514.1747007 16
result: 1621512514.4758775 17
result: 1621512514.675989 18
result: 1621512514.7766125 19
x = 12
x = 1
x = 13
x = 7
x = 14
x = 2
result: 1621512514.2716103 10
result: 1621512515.1721854 11
result: 1621512515.9727488 12
result: 1621512516.6744206 13
result: 1621512517.276999 14
x = 8
x = 9
x = 3
result: 1621512514.7695887 5
result: 1621512516.170747 6
result: 1621512517.4713914 7
result: 1621512518.6734042 8
result: 1621512519.7743165 9
x = 4
result: 1621512515.268784 0
result: 1621512517.1698637 1
result: 1621512518.9698756 2
result: 1621512520.671273 3
result: 1621512522.2716706 4

您可以清楚地看到这些结果不是按完成顺序产生的。例如,我已经返回1621512519.7743165 9后跟1621512515.268784 0,它是由工作函数返回的,比之前返回的结果早 4 秒以上。但是,如果我将chunksize值更改为 1,打印输出将变为:

x = 0
x = 1
x = 2
x = 3
x = 4
result: 1621513028.888357 3
x = 5
result: 1621513028.9863524 2
x = 6
result: 1621513029.0838938 1
x = 7
result: 1621513029.1825204 0
x = 8
result: 1621513030.4842813 7
x = 9
result: 1621513030.4852195 6
x = 10
result: 1621513030.4872172 5
x = 11
result: 1621513030.4892178 4
x = 12
result: 1621513031.3908074 11
x = 13
result: 1621513031.4895358 10
x = 14
result: 1621513031.587289 9
x = 15
result: 1621513031.686152 8
x = 16
result: 1621513032.1877549 15
x = 17
result: 1621513032.1896958 14
x = 18
result: 1621513032.1923752 13
x = 19
result: 1621513032.1923752 12
result: 1621513032.2935638 19
result: 1621513032.3927407 18
result: 1621513032.4912949 17
result: 1621513032.5884912 16

按完成顺序。但是,我不愿声明,如果指定chunksize值为 1,imap_unordered 则始终返回可用的结果,尽管基于此实验似乎是这种情况,因为文档没有这样的声明。

讨论

chunksize指定为 5 时,这 20 个任务被放置在单个输入队列中,供池中的 4 个进程以大小为 5 的块进行处理。因此,空闲的进程将从队列中取出下一个 5 个任务块并在再次空闲之前依次处理它们中的每一个。因此,第一个进程将处理x参数 0 到 4,第二个进程将处理x参数 5 到 9 等。这就是为什么您看到初始x值打印为 0、5、10 和 15 的原因。

但是,虽然x参数 0 的结果在x参数 9 的结果之前完成,但结果似乎作为块一起写出,因此x参数 0 的结果将不会返回,直到x在同一队列中排队的参数的结果块(即 1、2、3 和 4)也可用。

于 2021-05-20T12:26:32.790 回答