首先,您的论点以worker_function
错误的顺序定义。
正如您所观察到的,每个进程都会获得数组的副本。您可以做的最好的事情是返回修改后的数组:
import numpy as np
import multiprocessing as mp
from functools import partial
def worker_function(ar, i): # put the arguments in the correct order!
val = 2
ar[i] = val
#print(ar)
return ar # return modified array
def main():
ar = np.zeros(5)
func_part = partial(worker_function, ar)
arrays = mp.Pool(2).map(func_part, range(2)) # pool size of 2, otherwise what is the point?
for array in arrays:
print(array)
if __name__ == '__main__':
main()
印刷:
[2. 0. 0. 0. 0.]
[0. 2. 0. 0. 0.]
但是现在您正在处理两个单独修改的数组。您必须添加额外的逻辑才能将这两个数组的结果合并为一个:
import numpy as np
import multiprocessing as mp
from functools import partial
def worker_function(ar, i): # put the arguments in the correct order!
val = 2
ar[i] = val
#print(ar)
return ar # return modified array
def main():
ar = np.zeros(5)
func_part = partial(worker_function, ar)
arrays = mp.Pool(2).map(func_part, range(2)) # pool size of 2, otherwise what is the point?
for i in range(2):
ar[i] = arrays[i][i]
print(ar)
if __name__ == '__main__':
main()
印刷:
[2. 2. 0. 0. 0.]
但是更有意义的worker_function
是返回一个元组,给出被修改元素的索引和新值:
import numpy as np
import multiprocessing as mp
from functools import partial
def worker_function(ar, i): # put the arguments in the correct order!
return i, i + 3 # index, new value
def main():
ar = np.zeros(5)
func_part = partial(worker_function, ar)
results = mp.Pool(2).map(func_part, range(2))
for index, value in results:
ar[index] = value
print(ar)
if __name__ == '__main__':
main()
印刷:
[3. 4. 0. 0. 0.]
当然,如果worker_function
修改了多个值,就会返回一个元组的元组。
最后,如果您确实需要将对象传递给子进程,还有另一种使用池初始化程序的方法:
import numpy as np
import multiprocessing as mp
def pool_initializer(ar):
global the_array
the_array = ar
def worker_function(i):
return i, the_array[i] ** 2 # index, value
def main():
ar = np.array([1,2,3,4,5])
with mp.Pool(5, pool_initializer, (ar,)) as pool:
results = pool.map(worker_function, range(5))
for index, value in results:
ar[index] = value
print(ar)
if __name__ == '__main__':
main()
印刷:
[ 1 4 9 16 25]