我编写了一个 python 程序来并行化一堆矩阵乘法。以下代码将整个计算分配给四个独立的进程。(为了简化演示,我删除了收集结果的代码。)由于矩阵乘法需要重复数千次,因此四个进程保持活动状态,并且可以通过管道暂停/恢复。问题是当我只恢复一个进程时,它会很快完成(0.13s)。但是,当恢复更多进程时,每个进程所消耗的时间会增加。(两个 0.31 秒,三个 0.45 秒,四个 0.6 秒)。由于这四个进程是完全独立的,所以我真的不明白这里发生了什么。
import multiprocessing as mp
import numpy as np
import time
def worker(a,b,Ind):
st = time.time()
M = len(a)
a = np.reshape(a,[1,M])
NN = b.shape[1]
result = np.zeros([NN,len(Ind)])
# loop for delay points
for ii in range(0,len(Ind)):
temp = a.dot(b[Ind[ii]:Ind[ii]+M,:])
result[:,ii] = np.abs(temp)**2
print(f'Elapsed time {time.time()-st} for {len(Ind)} loops')
return result
def missions(a,b,Ind,Bob):
while True:
# wait until receive something
flag = Bob.recv()
if flag==1:
temp = worker(a,b,Ind)
Bob.send(1)
if __name__=='__main__':
N = 4
M = 160
procs = []
pipes = []
Ind0 = np.array(range(1600))*10
a = np.random.random(1998)
b = np.random.random((17988,700))+1j*np.random.random((17988,700))
for ii in range(N):
Ind_trace = np.array(range(0,M))+ii*M
Ind_trace.astype(int)
Ind = Ind0[Ind_trace]
# add pipes
Alice,Bob = mp.Pipe()
pipes.append({'Alice':Alice,'Bob':Bob})
# add process
procs.append(mp.Process(target=missions,args=(a,b,Ind,pipes[ii]['Bob'])))
for p in procs:
p.start()
# start process
tasks = [0,1,2,3]
#tasks = list([0])
# start tasks
for ii in tasks:
pipes[ii]['Alice'].send(1) # send signal to Bob to start mission
# check if all the tasks is finished
while(True):
if len(tasks)==0:
break
for ii in tasks:
if pipes[ii]['Alice'].poll():
if pipes[ii]['Alice'].recv()==1: #finished
tasks.remove(ii)
for p in procs:
p.terminate()
p.kill()