2

基本上我想要的是将那些插入2到.ararworker_function

import numpy as np
import multiprocessing as mp
from functools import partial


def worker_function(i=None, ar=None):
    val = 2
    ar[i] = val
    print(ar)


def main():
    ar = np.zeros(5)
    func_part = partial(worker_function, ar=ar)
    mp.Pool(1).map(func_part, range(2))
    print(ar)


if __name__ == '__main__':
    main()

到目前为止,我唯一能做到的就是改变ar内部worker_function而不是外部的副本:

[2. 0. 0. 0. 0.]
[0. 2. 0. 0. 0.]
[0. 0. 0. 0. 0.]
4

2 回答 2

2

为了提高性能,您应该在此处使用共享内存multiprocessing.Array,以避免一次又一次地在不同进程之间重建和发送数组。该数组在所有进程中都是相同的,在您发送副本的示例中并非如此。这也是您看不到父级所做更改的原因。

import multiprocessing as mp
import numpy as np


def worker_function(i):
    global arr
    val = 2
    arr[i] = val
    print(mp.current_process().name, arr[:])


def init_arr(arr):
    globals()['arr'] = arr


def main():
    # as long as we don't conditionally modify the same indices 
    # from multiple workers, we don't need the lock ...
    arr = mp.Array('i', np.zeros(5, dtype=int), lock=False)
    mp.Pool(2, initializer=init_arr, initargs=(arr,)).map(worker_function, range(5))
    print(mp.current_process().name, arr[:])


if __name__ == '__main__':
    main()

输出:

ForkPoolWorker-1 [2, 0, 0, 0, 0]
ForkPoolWorker-2 [2, 2, 0, 0, 0]
ForkPoolWorker-1 [2, 2, 2, 0, 0]
ForkPoolWorker-2 [2, 2, 2, 2, 0]
ForkPoolWorker-1 [2, 2, 2, 2, 2]
MainProcess [2, 2, 2, 2, 2]

Process finished with exit code 0
于 2020-10-05T12:47:06.367 回答
1

首先,您的论点以worker_function错误的顺序定义。

正如您所观察到的,每个进程都会获得数组的副本。您可以做的最好的事情是返回修改后的数组:

import numpy as np
import multiprocessing as mp
from functools import partial


def worker_function(ar, i): # put the arguments in the correct order!
    val = 2
    ar[i] = val
    #print(ar)
    return ar # return modified array


def main():
    ar = np.zeros(5)
    func_part = partial(worker_function, ar)
    arrays = mp.Pool(2).map(func_part, range(2)) # pool size of 2, otherwise what is the point?
    for array in arrays:
        print(array)


if __name__ == '__main__':
    main()

印刷:

[2. 0. 0. 0. 0.]
[0. 2. 0. 0. 0.]

但是现在您正在处理两个单独修改的数组。您必须添加额外的逻辑才能将这两个数组的结果合并为一个:

import numpy as np
import multiprocessing as mp
from functools import partial


def worker_function(ar, i): # put the arguments in the correct order!
    val = 2
    ar[i] = val
    #print(ar)
    return ar # return modified array


def main():
    ar = np.zeros(5)
    func_part = partial(worker_function, ar)
    arrays = mp.Pool(2).map(func_part, range(2)) # pool size of 2, otherwise what is the point?
    for i in range(2):
        ar[i] = arrays[i][i]
    print(ar)


if __name__ == '__main__':
    main()

印刷:

[2. 2. 0. 0. 0.]

但是更有意义的worker_function是返回一个元组,给出被修改元素的索引和新值:

import numpy as np
import multiprocessing as mp
from functools import partial


def worker_function(ar, i): # put the arguments in the correct order!
    return i, i + 3 # index, new value


def main():
    ar = np.zeros(5)
    func_part = partial(worker_function, ar)
    results = mp.Pool(2).map(func_part, range(2))
    for index, value in results:
        ar[index] = value
    print(ar)


if __name__ == '__main__':
    main()

印刷:

[3. 4. 0. 0. 0.]

当然,如果worker_function修改了多个值,就会返回一个元组的元组。

最后,如果您确实需要将对象传递给子进程,还有另一种使用池初始化程序的方法:

import numpy as np
import multiprocessing as mp


def pool_initializer(ar):
    global the_array

    the_array = ar


def worker_function(i):
    return i, the_array[i] ** 2 # index, value


def main():
    ar = np.array([1,2,3,4,5])
    with mp.Pool(5, pool_initializer, (ar,)) as pool:
        results = pool.map(worker_function, range(5))
    for index, value in results:
        ar[index] = value
    print(ar)


if __name__ == '__main__':
    main()

印刷:

[ 1  4  9 16 25]
于 2020-10-05T12:13:56.593 回答