我遇到了一个无法通过在网络上进行搜索来解决的问题。
我正在使用下面的最小代码。目标是通过多处理(使用 ProcessPoolExecutor)运行一些函数“f_sum”数百万次。我正在通过元组“args”列表添加多个参数。此外,该函数应该使用某种对所有执行都相同的数据(在示例中它只是一个数字)。出于内存原因,我不想将数据添加到“args”元组中。
到目前为止,我发现的唯一选择是将数据添加到“if name == ' main '”之外。这将(出于某种我不明白的原因)使变量可用于所有进程。但是,无法更新。此外,我真的不想在外部进行数据定义,因为在实际代码中它将基于数据导入并且可能需要额外的操作。
希望您能帮助并提前致谢!
PS:我在 Win 10 上使用 Python 3.7.9。
from concurrent.futures import ProcessPoolExecutor
import numpy as np
data = 0 # supposed to be a large data set & shared among all calculations)
num_workers = 6 # number of CPU cores
num_iterations = 10 # supposed to be large number
def f_sum(args):
(x,y) = args
print('This is process', x, 'with exponent:', y)
value = 0
for i in range(10**y):
value += i
return value/10**y + data
def multiprocessing(func, args, workers):
with ProcessPoolExecutor(workers) as executor:
results = executor.map(func, args)
return list(results)
if __name__ == '__main__':
data = 0.5 # try to update data, should not be part of 'args' due to memory
args = []
for k in range(num_iterations):
args.append((k, np.random.randint(1,8)))
result = multiprocessing(f_sum, args, num_workers)
if np.abs(result[0]-np.round(result[0])) > 0:
print('data NOT updated')
编辑原始问题:
>> 性能示例1
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import time
data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
data = np.random.randint(0,100,size=data_size)
# data = np.linspace(0,data_size,data_size+1, dtype=np.uintc)
def f_sum(args):
(x,y) = args
print('This is process', x, 'random number:', y, 'last data', data[-1])
value = 0
for i in range(num_sum):
value += i
result = value - num_sum*(num_sum-1)/2 + data[-1]
return result
def multiprocessing(func, args, workers):
with ProcessPoolExecutor(workers) as executor:
results = executor.map(func, args)
return list(results)
if __name__ == '__main__':
t0 = time.time()
args = []
for k in range(num_iterations):
args.append((k, np.random.randint(1,10)))
result = multiprocessing(f_sum, args, num_workers)
print(f'expected result: {data[-1]}, actual result: {np.unique(result)}')
t1 = time.time()
print(f'total time: {t1-t0}')
>> 输出
This is process 99 random number: 6 last data 9
expected result: 86, actual result: [ 3. 9. 29. 58.]
total time: 11.760863542556763
如果使用 randint 会导致错误的结果。对于 linspace 结果是正确的。
>> 性能示例 2 - 基于回答中的建议
from concurrent.futures import ProcessPoolExecutor
import numpy as np
from multiprocessing import Array
import time
data_size = 10**8
num_workers = 4
num_sum = 10**7
num_iterations = 100
input = np.random.randint(0, 100, size=data_size)
# input = np.linspace(0, data_size, data_size + 1, dtype=np.uintc)
def f_sum(args):
(x,y) = args
print('This is process', x, 'random number:', y, 'last data', data[-1])
value = 0
for i in range(num_sum):
value += i
result = value - num_sum*(num_sum-1)/2 + data[-1]
return result
def init_pool(the_data):
global data
data = the_data
def multiprocessing(func, args, workers, input):
data = Array('i', input, lock=False)
with ProcessPoolExecutor(max_workers=workers, initializer=init_pool, initargs=(data,)) as executor:
results = list(executor.map(func, args))
return results
if __name__ == '__main__':
t0 = time.time()
args = []
for k in range(num_iterations):
args.append((k, np.random.randint(1,10)))
result = multiprocessing(f_sum, args, num_workers, input)
print(f'expected result: {input[-1]}, actual result:{np.unique(result)}')
t1 = time.time()
print(f'total time: {t1-t0}')
>> 输出
This is process 99 random number: 7 last data 29
expected result: 29, actual result: [29.]
total time: 30.8266122341156
@Booboo
我在原始问题中添加了两个示例,“性能示例 2”基于您的代码。第一个有趣的发现是,如果数据数组是用随机整数初始化的,我的原始代码实际上会给出不正确的结果。我注意到,每个进程本身都会初始化数据数组。由于它基于随机数,因此每个进程使用不同的数组进行计算,甚至与主进程不同。因此,该用例不适用于此代码,在您的代码中它始终是正确的。
但是,如果使用 linspace,它会起作用,因为每次都会给出相同的结果。对于从文件中读取一些数据的用例(这是我的实际用例)也是如此。示例 1 仍然比示例 2 快大约 3 倍,我认为时间主要用于在您的方法中初始化数组。
关于内存使用,我在任务管理器中看不到相关差异。即使形状不同,这两个示例都会产生类似的记忆增加。
我仍然相信您的方法是正确的方法,但是,在上面的示例中,内存使用情况似乎相似并且速度较慢。