14

我有一个256x256x256Numpy 数组,其中每个元素都是一个矩阵。我需要对这些矩阵中的每一个进行一些计算,并且我想使用该multiprocessing模块来加快速度。

这些计算的结果必须256x256x256像原来的那样存储在一个数组中,这样原数组中元素处的矩阵的结果就[i,j,k]必须放在[i,j,k]新数组的元素中。

为此,我想制作一个可以以伪方式编写的列表,[array[i,j,k], (i, j, k)]并将其传递给要“多处理”的函数。假设这matrices是从原始数组中提取的所有矩阵的列表,并且myfunc是进行计算的函数,代码看起来有点像这样:

import multiprocessing
import numpy as np
from itertools import izip

def myfunc(finput):
    # Do some calculations...
    ...

    # ... and return the result and the index:
    return (result, finput[1])

# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)

# Make function input from the matrices and the indices:
finput = izip(matrices, inds)

pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))

但是,似乎map_async实际上是finput首先创建了这个巨大的列表:我的 CPU 并没有做太多事情,但是内存和交换在几秒钟内就被完全消耗掉了,这显然不是我想要的。

有没有办法将这个巨大的列表传递给多处理函数,而无需先显式创建它?或者你知道解决这个问题的另一种方法吗?

非常感谢!:-)

4

3 回答 3

12

一旦调用函数,所有multiprocessing.Pool.map*方法都会完全使用迭代器(演示代码) 。要一次给迭代器的 map 函数块提供一个块,请使用grouper_nofill

def grouper_nofill(n, iterable):
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
    '''
    it=iter(iterable)
    def take():
        while 1: yield list(itertools.islice(it,n))
    return iter(take().next,[])

chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
    async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)

PS。pool.map_asyncchunksize参数做了一些不同的事情:它将迭代分解成块,然后将每个块提供给调用map(func,chunk). 如果完成得太快,这可以为工作进程提供更多数据来咀嚼func(item),但它对您的情况没有帮助,因为在map_async发出调用后迭代器仍然会立即被完全消耗。

于 2011-09-05T11:22:32.040 回答
2

我也遇到了这个问题。而不是这个:

res = p.map(func, combinations(arr, select_n))

res = p.imap(func, combinations(arr, select_n))

imap 不消耗它!

于 2014-01-14T15:38:44.167 回答
0

Pool.map_async()需要知道可迭代的长度以将工作分派给多个工作人员。由于iziphas no __len__,它首先将可迭代对象转换为列表,从而导致您遇到大量内存使用。

您可以尝试通过创建自己的izip风格迭代器来回避这个问题__len__

于 2011-09-05T11:00:03.320 回答