1

下面的代码是一个人为的示例,它模拟了我使用多处理来加速代码的实际问题。代码运行在Windows 10 64-bit OS, python 3.7.5, 和ipython 7.9.0


转换函数(这些函数将用于转换数组main()

from itertools import product
from functools import partial

from numba import njit, prange
import multiprocessing as mp
import numpy as np

@njit(parallel= True)
def transform_array_c(data, n):

    ar_len= len(data)

    sec_max1= np.empty(ar_len, dtype = data.dtype)
    sec_max2= np.empty(ar_len, dtype = data.dtype)

    for i in prange(n-1):
        sec_max1[i]= np.nan

    for sec in prange(ar_len//n):
        s2_max= data[n*sec+ n-1]
        s1_max= data[n*sec+ n]

        for i in range(n-1,-1,-1):
            if data[n*sec+i] > s2_max:
                s2_max= data[n*sec+i]
            sec_max2[n*sec+i]= s2_max

        sec_max1[n*sec+ n-1]= sec_max2[n*sec]

        for i in range(n-1):
            if n*sec+n+i < ar_len:
                if data[n*sec+n+i] > s1_max:
                    s1_max= data[n*sec+n+i]
                sec_max1[n*sec+n+i]= max(s1_max, sec_max2[n*sec+i+1])

            else:
                break

    return sec_max1  

@njit(error_model= 'numpy', cache= True)
def rt_mean_sq_dev(array1, array2, n):
    msd_temp = np.empty(array1.shape[0])

    K = array2[n-1]

    rs_x= array1[0] - K
    rs_xsq = rs_x *rs_x

    msd_temp[0] = np.nan

    for i in range(1,n):
        rs_x += array1[i] - K
        rs_xsq += np.square(array1[i] - K)
        msd_temp[i] = np.nan

    y_i = array2[n-1] - K
    msd_temp[n-1] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))

    for i in range(n, array1.shape[0]):
        rs_x = array1[i] - array1[i-n]+ rs_x
        rs_xsq = np.square(array1[i] - K) - np.square(array1[i-n] - K) + rs_xsq
        y_i = array2[i] - K

        msd_temp[i] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))

    return msd_temp 

@njit(cache= True)
def transform_array_a(data, n):
    result = np.empty(data.shape[0], dtype= data.dtype)
    alpharev = 1. - 2 / (n + 1)
    alpharev_exp = alpharev

    e = data[0]
    w = 1.

    if n == 2: result[0] = e
    else:result[0] = np.nan

    for i in range(1, data.shape[0]):
        w += alpharev_exp
        e = e*alpharev + data[i]

        if i > n -3:result[i] = e / w
        else:result[i] = np.nan

        if alpharev_exp > 3e-307:alpharev_exp*= alpharev
        else:alpharev_exp=0.

    return result

多处理部分

def func(tup, data):    #<-------------the function to be run among all 
    a_temp= a[tup[2][0]]

    idx1 = a_temp > a[tup[2][1]]
    idx2= a_temp < b[(tup[2][1], tup[1][1])]

    c_final = c[tup[0][1]][idx1 | idx2]
    data_final= data[idx1 | idx2]

    return (tup[0][0], tup[1][0], *tup[2]), c_final[-1] - data_final[-1]

def setup(a_dict, b_dict, c_dict):    #initialize the shared dictionaries
    global a,b,c
    a,b,c = a_dict, b_dict, c_dict

def main(a_arr, b_arr, c_arr, common_len):
    np.random.seed(0)
    data_array= np.random.normal(loc= 24004, scale=500, size= common_len)

    a_size = a_arr[-1] + 1
    b_size = len(b_arr)
    c_size = len(c_arr)

    loop_combo = product(enumerate(c_arr),
                         enumerate(b_arr),
                         (n_tup for n_tup in product(np.arange(1,a_arr[-1]), a_arr) if n_tup[1] > n_tup[0])
                         )
    result = np.zeros((c_size, b_size, a_size -1 ,a_size), dtype = np.float32) 

    ###################################################
    #This part simulates the heavy-computation in the actual problem

    a= {}
    b= {}
    c= {}

    for i in range(1, a_arr[-1]+1):

        a[i]= transform_array_a(data_array, i)
        if i in a_arr:
            for j in b_arr:
                b[(i,j)]= rt_mean_sq_dev(data_array, a[i], i)/data_array *j


    for i in c_arr:
        c[i]= transform_array_c(data_array, i)

    ###################################################    
    with mp.Pool(processes= mp.cpu_count() - 1,
                 initializer= setup,
                 initargs= [a,b,c]
                 ) as pool:
        mp_res= pool.imap_unordered(partial(func, data= data_array),
                                    loop_combo
                                    )

        for item in mp_res:
            result[item[0]] =item[1]


    return result


if __name__ == '__main__':
    mp.freeze_support()

    a_arr= np.arange(2,44,2)
    b_arr= np.arange(0.4,0.8, 0.20)
    c_arr= np.arange(2,42,10)
    common_len= 440000

    final_res= main(a_arr, b_arr, c_arr, common_len)

出于性能原因,所有进程之间使用多个共享的“只读”字典以减少冗余计算(在实际问题中,在所有进程之间使用共享字典后,总计算时间减少了 40%)。但是,在我的实际问题中使用共享字典后,ram 的使用率变得高得多;我的 6C/12T Windows 计算机的内存使用量从(峰值 8.2GB,空闲 5.0GB)变为(峰值 23.9GB,空闲 5.0GB),为了获得 40% 的加速,付出的成本有点太高了。

当必须在进程之间使用多个共享数据时,高内存使用是否不可避免?可以对我的代码做些什么,以便在使用尽可能低的内存的同时使其尽可能快?

先感谢您


注意:我尝试使用imap_unordered()而不是map因为我听说当输入迭代很大时它应该减少内存使用量,但老实说我看不到内存使用量的改进。也许我在这里做错了什么?


编辑:由于答案中的反馈,我已经更改了代码的繁重计算部分,使其看起来不那么虚拟,并且类似于实际问题中的计算。

4

1 回答 1

2

在 Windows 中运行的python中操作共享字典时内存使用量过高multiprocessing

在我们进入细节之前,揭开这个问题的神秘面纱是公平的 -原始代码中没有共享字典,它们被操纵的越少(是的,每个a,b,c确实被“分配”到对dict_a, dict_b, dict_c但没有它们是共享的,但只是像在 Windows 类 O/S-es 中那样被复制multiprocessing。没有写入“到” dict-s (只是从它们的任何一个副本进行非破坏性读取)

类似地,np.memmap()-s 可以将最初提出的数据的某些部分放到磁盘空间上(这样做的代价是+承受一些(延迟屏蔽的)随机读取延迟,~ 10 [ms]而不是~ 0.5 [ns]智能对齐的矢量化内存模式被设计成性能热点),但这里不应该期望范式发生巨大的变化,因为“外部迭代器”几乎避免了任何智能对齐的缓存重用

可以对我的代码做些什么来使其尽可能快,同时使用尽可能低的内存?

第一个罪过是使用8B-int64来存储一个普通的 Bbit(这里还没有 Qbits 〜向本拿比量子研发团队致敬)

for i in c_arr:                                    # ~~ np.arange( 2, 42, 10 )
     np.random.seed( i )                           # ~ yields a deterministic state
     c[i] = np.random.poisson( size = common_len ) # ~ 440.000 int64-s with {0|1}

这在字典的所有副本中占用了6(进程)x 440000x 8B ~ 0.021 GB“走私” c,而每个这样的值都是确定性已知的,并且可以在相应的目标进程中生成 ALAP,只需知道i(确实不需要预先生成并多次复制~ 0.021 GB数据)

到目前为止,Windows 类 O/S缺少一个os.fork(),因此按照要求对尽可能多的复制 python 解释器会话(加上导入主模块)进行 python 完整副本(是的,RAM ...,是的,TIME) ,multiprocessing用于基于进程的分离(这样做是为了避免 GIL 锁有序的、纯[SERIAL]的、代码执行)


最好的下一步:
重构代码
以提高效率和性能

最好的下一步 - 重构代码,以最大限度地减少 6 个进程的“浅”(和昂贵)使用,但“外部” - 由中央迭代器(具有 ~ 18522 个项目loop_combo“独裁者”重复调用到“远程调度” func( tup, data ),以便获取一个简单的“DMA-tuple”-( (x,y,z), value )将一个存储value到中央进程中result--array float32)。

尝试增加计算“密度” - 因此尝试通过分而治之的方式重构代码(即,每个mp.pool进程在一个平滑块中计算一些非常大的、专用的参数子空间-空间覆盖(这里迭代地“从外面”)并且可以很容易地减少返回的结果块。这样做只会提高性能(最好没有任何形式的昂贵共享)。

这种重构将避免参数pickle/unpickle成本(附加开销 - 一次性(传递唯一的参数集值)和重复性(大约 18522 次执行的重复内存分配、累积和pickle/ -由于糟糕的呼号设计/工程造成的unpickle成本)np.arange( 440000 )

所有这些步骤将提高您的处理效率并减少不必要的 RAM 分配。

于 2019-11-05T01:32:33.983 回答