0

我遇到了一个无法通过在网络上进行搜索来解决的问题。

我正在使用下面的最小代码。目标是通过多处理(使用 ProcessPoolExecutor)运行一些函数“f_sum”数百万次。我正在通过元组“args”列表添加多个参数。此外,该函数应该使用某种对所有执行都相同的数据(在示例中它只是一个数字)。出于内存原因,我不想将数据添加到“args”元组中。

到目前为止,我发现的唯一选择是将数据添加到“if name == ' main '”之外。这将(出于某种我不明白的原因)使变量可用于所有进程。但是,无法更新。此外,我真的不想在外部进行数据定义,因为在实际代码中它将基于数据导入并且可能需要额外的操作。

希望您能帮助并提前致谢!

PS:我在 Win 10 上使用 Python 3.7.9。

from concurrent.futures import ProcessPoolExecutor
import numpy as np

data = 0 # supposed to be a large data set & shared among all calculations)
num_workers = 6  # number of CPU cores
num_iterations = 10  # supposed to be large number


def f_sum(args):
    (x,y) = args
    print('This is process', x, 'with exponent:', y)
    value = 0
    for i in range(10**y):
        value += i
    return value/10**y + data


def multiprocessing(func, args, workers):
    with ProcessPoolExecutor(workers) as executor:
        results = executor.map(func, args)
    return list(results)


if __name__ == '__main__':
    data = 0.5  # try to update data, should not be part of 'args' due to memory

    args = []
    for k in range(num_iterations):
        args.append((k, np.random.randint(1,8)))

    result = multiprocessing(f_sum, args, num_workers)

    if np.abs(result[0]-np.round(result[0])) > 0:
        print('data NOT updated')

编辑原始问题:

>> 性能示例1

from concurrent.futures import ProcessPoolExecutor
import numpy as np
import time

data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
data = np.random.randint(0,100,size=data_size)
# data = np.linspace(0,data_size,data_size+1, dtype=np.uintc)

def f_sum(args):
    (x,y) = args
    print('This is process', x, 'random number:', y, 'last data', data[-1])
    value = 0
    for i in range(num_sum):
        value += i
    result = value - num_sum*(num_sum-1)/2 + data[-1]
    return result

def multiprocessing(func, args, workers):
    with ProcessPoolExecutor(workers) as executor:
        results = executor.map(func, args)
    return list(results)

if __name__ == '__main__':
    t0 = time.time()

    args = []
    for k in range(num_iterations):
        args.append((k, np.random.randint(1,10)))

    result = multiprocessing(f_sum, args, num_workers)

    print(f'expected result: {data[-1]}, actual result: {np.unique(result)}')
    t1 = time.time()
    print(f'total time: {t1-t0}')

>> 输出

This is process 99 random number: 6 last data 9
expected result: 86, actual result: [ 3.  9. 29. 58.]
total time: 11.760863542556763

如果使用 randint 会导致错误的结果。对于 linspace 结果是正确的。

>> 性能示例 2 - 基于回答中的建议

from concurrent.futures import ProcessPoolExecutor
import numpy as np
from multiprocessing import Array
import time

data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
input = np.random.randint(0, 100, size=data_size)
# input = np.linspace(0, data_size, data_size + 1, dtype=np.uintc)

def f_sum(args):
    (x,y) = args
    print('This is process', x, 'random number:', y, 'last data', data[-1])
    value = 0
    for i in range(num_sum):
        value += i
    result = value - num_sum*(num_sum-1)/2 + data[-1]
    return result

def init_pool(the_data):
    global data
    data = the_data

def multiprocessing(func, args, workers, input):
    data = Array('i', input, lock=False)
    with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(data,)) as executor:
        results = list(executor.map(func, args))
    return results

if __name__ == '__main__':
    t0 = time.time()
    args = []
    for k in range(num_iterations):
        args.append((k, np.random.randint(1,10)))

    result = multiprocessing(f_sum, args, num_workers, input)

    print(f'expected result: {input[-1]}, actual result:{np.unique(result)}')
    t1 = time.time()
    print(f'total time: {t1-t0}')

>> 输出

This is process 99 random number: 7 last data 29
expected result: 29, actual result: [29.]
total time: 30.8266122341156

@Booboo

我在原始问题中添加了两个示例,“性能示例 2”基于您的代码。第一个有趣的发现是,如果数据数组是用随机整数初始化的,我的原始代码实际上会给出不正确的结果。我注意到,每个进程本身都会初始化数据数组。由于它基于随机数,因此每个进程使用不同的数组进行计算,甚至与主进程不同。因此,该用例不适用于此代码,在您的代码中它始终是正确的。

但是,如果使用 linspace,它会起作用,因为每次都会给出相同的结果。对于从文件中读取一些数据的用例(这是我的实际用例)也是如此。示例 1 仍然比示例 2 快大约 3 倍,我认为时间主要用于在您的方法中初始化数组。

关于内存使用,我在任务管理器中看不到相关差异。即使形状不同,这两个示例都会产生类似的记忆增加。

我仍然相信您的方法是正确的方法,但是,在上面的示例中,内存使用情况似乎相似并且速度较慢。

4

1 回答 1

0

最有效的内存使用方式是使用共享内存,以便所有进程都在同一个data. 如果流程更新,这将是绝对必要的data。在下面的示例中,由于访问data是只读的并且我使用的是一个简单的整数数组,因此我使用multiprocessing.Array的是没有指定锁定。“技巧”是通过指定initializerinitargs参数来初始化池,以便池中的每个进程都可以访问此共享内存。我已经对代码进行了一些其他更改,我已对此进行了评论

from concurrent.futures import ProcessPoolExecutor
import numpy as np
from multiprocessing import Array, cpu_count # new imports


def init_pool(the_data):
    global data
    data = the_data

def f_sum(args):
    (x,y) = args
    print('This is process', x, 'with exponent:', y)
    value = 0
    for i in range(10**y):
        value += i
    return value/10**y + len(data) # just use the length of data for now

def multiprocessing(func, args, workers):
    data = Array('i', range(1000), lock=False) # read-only, integers 0, 1, 2, ... 999
    with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(data,)) as executor:
        results = list(executor.map(func, args)) # create the list of results here
    print(results) # so that it can be printed out for demo purposes
    return results


if __name__ == '__main__':
    num_iterations = 10  # supposed to be large number
    #num_workers = 6  # number of CPU cores
    num_workers = cpu_count()  # number of CPU cores

    args = []
    for k in range(num_iterations):
        args.append((k, np.random.randint(1,8)))

    result = multiprocessing(f_sum, args, num_workers)

    if np.abs(result[0]-np.round(result[0])) > 0:
        print('data NOT updated')

印刷:

This is process 0 with exponent: 2
This is process 1 with exponent: 1
This is process 2 with exponent: 4
This is process 3 with exponent: 3
This is process 4 with exponent: 5
This is process 5 with exponent: 1
This is process 6 with exponent: 5
This is process 7 with exponent: 2
This is process 8 with exponent: 6
This is process 9 with exponent: 6
[1049.5, 1004.5, 5999.5, 1499.5, 50999.5, 1004.5, 50999.5, 1049.5, 500999.5, 500999.5]
data NOT updated

更新示例 2

您看到了我对您关于示例 1 的问题的评论。

您的示例 2 仍然不理想:您将语句input = np.random.randint(0, 100, size=data_size)作为全局语句,在初始化以供在进程池中使用时,每个进程都不必要地执行它。下面是一个更新的解决方案,它还展示了一种方法,您可以让您的工作函数直接与numpy备份multiprocessing.Array实例的数组一起工作,以便该numpy数组存在于共享内存中。您不必将这种技术用于您正在做的事情,因为您只是numpy用来创建随机数(我不确定为什么),但它是一种有用的技术。但是你应该在移动初始化代码之后重新运行你的代码,input所以它只执行一次。

我没有机会进行numpy日常工作,但我了解到它在内部使用多处理来实现其自身的许多功能。所以它通常不是与多处理一起使用的最佳匹配,尽管这似乎不适用于这里,因为即使在下面的情况下,我们也只是索引数组的一个元素,它不会使用子进程来完成它.

from concurrent.futures import ProcessPoolExecutor
import numpy as np
from multiprocessing import Array
import time
import ctypes

data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
# input = np.linspace(0, data_size, data_size + 1, dtype=np.uintc)


def to_shared_array(arr, ctype):
    shared_array = Array(ctype, arr.size, lock=False)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

def to_numpy_array(shared_array, shape):
    '''Create a numpy array backed by a shared memory Array.'''
    arr = np.ctypeslib.as_array(shared_array)
    return arr.reshape(shape)

def f_sum(args):
    (x,y) = args
    print('This is process', x, 'random number:', y, 'last data', data[-1])
    value = 0
    for i in range(num_sum):
        value += i
    result = value - num_sum*(num_sum-1)/2 + data[-1]
    return result

def init_pool(shared_array, shape):
    global data
    data = to_numpy_array(shared_array, shape)

def multiprocessing(func, args, workers, input):
    input = np.random.randint(0, 100, size=data_size)
    shape = input.shape
    shared_array = to_shared_array(input, ctypes.c_long)
    with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(shared_array, shape)) as executor:
        results = list(executor.map(func, args))
    return input, results

if __name__ == '__main__':
    t0 = time.time()
    args = []
    for k in range(num_iterations):
        args.append((k, np.random.randint(1,10)))

    input, result = multiprocessing(f_sum, args, num_workers, input)

    print(f'expected result: {input[-1]}, actual result:{np.unique(result)}')
    t1 = time.time()
    print(f'total time: {t1-t0}')
于 2021-02-06T13:47:17.180 回答