我有一个 scipy.sparse.csr_matrix 格式的大型稀疏矩阵 X,我想利用并行性将它乘以一个 numpy 数组 W。经过一些研究,我发现我需要在多处理中使用 Array 以避免在进程之间复制 X 和 W (来自例如:How to combine Pool.map with Array (shared memory) in Python multiprocessing?和Is shared readonly data copy to Python 多处理的不同进程?)。这是我最近的尝试
import multiprocessing
import numpy
import scipy.sparse
import time
def initProcess(data, indices, indptr, shape, Warr, Wshp):
global XData
global XIndices
global XIntptr
global Xshape
XData = data
XIndices = indices
XIntptr = indptr
Xshape = shape
global WArray
global WShape
WArray = Warr
WShape = Wshp
def dot2(args):
rowInds, i = args
global XData
global XIndices
global XIntptr
global Xshape
data = numpy.frombuffer(XData, dtype=numpy.float)
indices = numpy.frombuffer(XIndices, dtype=numpy.int32)
indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32)
Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape)
global WArray
global WShape
W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape)
return Xr[rowInds[i]:rowInds[i+1], :].dot(W)
def getMatmat(X):
numJobs = multiprocessing.cpu_count()
rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int)
#Store the data in X as RawArray objects so we can share it amoung processes
XData = multiprocessing.RawArray("d", X.data)
XIndices = multiprocessing.RawArray("i", X.indices)
XIndptr = multiprocessing.RawArray("i", X.indptr)
def matmat(W):
WArray = multiprocessing.RawArray("d", W.flatten())
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape))
params = []
for i in range(numJobs):
params.append((rowInds, i))
iterator = pool.map(dot2, params)
P = numpy.zeros((X.shape[0], W.shape[1]))
for i in range(numJobs):
P[rowInds[i]:rowInds[i+1], :] = iterator[i]
return P
return matmat
if __name__ == '__main__':
#Create a random sparse matrix X and a random dense one W
X = scipy.sparse.rand(10000, 8000, 0.1)
X = X.tocsr()
W = numpy.random.rand(8000, 20)
startTime = time.time()
A = getMatmat(X)(W)
parallelTime = time.time()-startTime
startTime = time.time()
B = X.dot(W)
nonParallelTime = time.time()-startTime
print(parallelTime, nonParallelTime)
但是输出类似于: (4.431, 0.165) 表示并行版本比非并行乘法慢得多。
我相信当一个人将大数据复制到进程时,在类似的情况下可能会导致减速,但这里不是这种情况,因为我使用 Array 来存储共享变量(除非它发生在 numpy.frombuffer 或创建 csr_matrix 时,但后来我找不到直接共享 csr_matrix 的方法)。速度慢的另一个可能原因是为每个进程返回每个矩阵乘法的大结果,但是我不确定解决这个问题的方法。
有人可以看到我哪里出错了吗?谢谢你的帮助!
更新:我不能确定,但我认为在进程之间共享大量数据并没有那么高效,理想情况下我应该使用多线程(尽管全局解释器锁(GIL)使得这非常困难)。解决此问题的一种方法是使用 Cython 发布 GIL(请参阅http://docs.cython.org/src/userguide/parallelism.html),尽管许多 numpy 函数需要通过 GIL。