13

我正在按顺序对 3 个不同的 numpy 2D 数组执行一些大型计算。阵列很大,每个 25000x25000。每次计算都需要大量时间,因此我决定在服务器上的 3 个 CPU 内核上并行运行其中的 3 个。我遵循标准的多处理指南并创建 2 个进程和一个工作函数。两个计算通过 2 个进程运行,第三个计算在本地运行,没有单独的进程。我将巨大的数组作为过程的参数传递,例如:

p1 = Process(target = Worker, args = (queue1, array1, ...)) # Some other params also going

p2 = Process(target = Worker, args = (queue2, array2, ...)) # Some other params also going

Worker 函数在附加在队列中的列表中发回两个 numpy 向量(一维数组),例如:

queue.put([v1, v2])

我没有使用multiprocessing.pool

但令人惊讶的是我没有得到加速,它实际上运行速度慢了 3 倍。传递大型数组需要时间吗?我无法弄清楚发生了什么。我应该使用共享内存对象而不是传递数组吗?

如果有人可以提供帮助,我将不胜感激。

谢谢你。

4

2 回答 2

2

我的问题似乎已解决。我在里面使用了一个 django 模块,我在里面调用了 multiprocessing.pool.map_async。我的工作函数是类本身内部的一个函数。这就是问题所在。多进程不能在另一个进程中调用同一类的函数,因为子进程不共享内存。所以在子进程内部没有类的活动实例。可能这就是它没有被调用的原因。据我了解。我从类中删除了该函数,并将其放在同一个文件中,但在类之外,就在类定义开始之前。有效。我也得到了适度的加速。还有一件事是面临同样问题的人请不要读取大型数组并在进程之间传递。酸洗和解酸会花费很多时间,而且您不会加快速度而是减慢速度。

如果可能,请使用 numpy.memmap 数组,它们非常快。

于 2013-11-08T19:37:24.667 回答
1

这是一个使用np.memmapand的示例Pool。看到您可以定义进程和工作人员的数量。在这种情况下,您无法控制队列,这可以通过以下方式实现multiprocessing.Queue

from multiprocessing import Pool

import numpy as np

def mysum(array_file_name, col1, col2, shape):
    a = np.memmap(array_file_name, shape=shape, mode='r+')
    a[:, col1:col2] = np.random.random((shape[0], col2-col1))
    ans = a[:, col1:col2].sum()
    del a
    return ans

if __name__ == '__main__':
    nop = 1000 # number_of_processes
    now = 3 # number of workers
    p = Pool(now)
    array_file_name = 'test.array'
    shape = (250000, 250000)
    a = np.memmap(array_file_name, shape=shape, mode='w+')
    del a
    cols = [[shape[1]*i/nop, shape[1]*(i+1)/nop] for i in range(nop)]
    results = []
    for c1, c2 in cols:
        r = p.apply_async(mysum, args=(array_file_name, c1, c2, shape))
        results.append(r)
    p.close()
    p.join()

    final_result = sum([r.get() for r in results])
    print final_result

如果可能,您可以使用共享内存并行处理来获得更好的性能。请参阅此相关问题:

于 2013-10-31T23:09:00.213 回答