1

我正在尝试利用concurrent.futures.ProcessPoolExecutorPython3并行处理一个大型矩阵。代码的一般结构是:

class X(object):

self.matrix

def f(self, i, row_i):
    <cpu-bound process>

def fetch_multiple(self, ids):
    with ProcessPoolExecutor() as executor:
        futures = [executor.submit(self.f, i, self.matrix.getrow(i)) for i in ids]
        return [f.result() for f in as_completed(futures)]

self.matrix是一个大的scipy csr_matrixf是我的并发函数,它需要一行self.matrix并在其上应用一个CPU 绑定的进程。最后,fetch_multiple是一个并行运行多个实例f并返回结果的函数。

问题是在运行脚本后,所有 cpu 核心的繁忙程度都低于 50%(见以下截图):

在此处输入图像描述

为什么所有核心都不忙?

self.matrix我认为问题在于进程之间传递行向量的大对象。我怎么解决这个问题?

4

1 回答 1

1

是的。开销不应该那么大 - 但这很可能是您的 CPU 出现中转的原因(尽管它们应该正忙于传递数据)。

但是尝试这里的配方,使用共享内存将对象的“指针”传递给子进程。

http://briansimulator.org/sharing-numpy-arrays-between-processes/

从那里引用:

from multiprocessing import sharedctypes
size = S.size
shape = S.shape
S.shape = size
S_ctypes = sharedctypes.RawArray('d', S)
S = numpy.frombuffer(S_ctypes, dtype=numpy.float64, count=size)
S.shape = shape

现在我们可以将 S_ctypes 和 shape 发送到 multiprocessing 中的子进程,并在子进程中将其转换回 numpy 数组,如下所示:

from numpy import ctypeslib
S = ctypeslib.as_array(S_ctypes)
S.shape = shape

处理引用计数应该很棘手,但我想numpy.ctypeslib会处理 - 所以,只需以它们不适用于相同数据的方式协调将实际行号传递给子进程

于 2017-09-01T02:05:13.840 回答