2

我有一个包含二进制编码字符串的大列表,我以前在单个函数中处理过这些字符串,如下所示:

""" just included this to demonstrate the 'data' structure """
data=np.zeros(250,dtype='float32, (250000,2)float32')

def func numpy_array(data, peaks):
rt_counter=0
    for x in peaks:
        if rt_counter %(len(peaks)/20) == 0:
            update_progress()
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[rt_counter][1][peak_counter][0]=float(buff2)
            else:
                data[rt_counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        rt_counter+=1

我一直在阅读多处理,并认为我想尝试一下,看看是否可以大幅提高性能,我将我的函数重写为 2(助手和“调用者”),如下所示:

def numpy_array(data, peaks):
    processors=mp.cpu_count #Might as well throw this directly in the mp.Pool (just for clarity for now)
    pool = mp.Pool(processes=processors)
    chunk_size=len(peaks)/processors
    for i in range(processors):
        counter = i*chunk_size
        chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
        pool.map(decode(data,chunk,counter))

def decode(data,chunk,counter):
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        print data[counter][1][10][0]
        counter+=1      

该程序运行但仅使用 100-110% 的 CPU(根据顶部),一旦它应该完成它就会TypeError: map() takes at least 3 arguments (2 given)向我抛出,任何对多进程有更多经验的人都可以给我一个关于要注意什么的提示(即可能导致TypeError)?什么可能导致我的 CPU 使用率低?

-- 合并答案后的代码 --

def decode((data,chunk,counter)):
    print len(chunk), counter
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        counter+=1

def numpy_array(data, peaks):
    """Fills the NumPy array 'data' with m/z-intensity values acquired
    from b64 decoding and unpacking the binary string read from the 
    mzXML file, which is stored in the list 'peaks'.

    The m/z values are assumed to be ordered without validating this
    assumption.

    Note: This function uses multi-processing
    """
    processors=mp.cpu_count()
    pool = mp.Pool(processes=processors)
    chunk_size=int(len(peaks)/processors)
    map_parameters=[]
    for i in range(processors):
        counter = i*chunk_size
        chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
        map_parameters.append((data,chunk,counter))
    pool.map(decode,map_parameters) 

到目前为止,这个最新版本“有效”,它在进程中填充数组(数组包含值),但是一旦所有进程完成访问数组,就会产生零值,因为每个进程都获得了数组的本地副本。

4

2 回答 2

2

像这样的东西应该工作

请注意,pool.map每次调用都需要一个函数和该函数的参数列表。在您的原始示例中,您只是在numpy_array函数中调用它。

该函数必须只有一个参数,因此将参数打包成一个元组,并在decode其中使用看起来很奇怪的双括号(称为元组解包)。

def numpy_array(data, peaks):
    processors=4
    pool = mp.Pool(processes=processors)
    chunk_size=len(data)/processors
    print range(processors)
    map_parameters = [] # new
    for i in range(processors):
        counter = i*chunk_size
        chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
        map_parameters.append((data,chunk,counter)) # new
    pool.map(decode, map_parameters) # new

def decode((data,chunk,counter)): # changed
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        print data[counter][1][10][0]
        counter+=1
于 2013-04-12T08:14:45.803 回答
1

该错误在您的numpy_array函数中:

for i in range(processors):
    counter = i*chunk_size
    chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
    pool.map(decode(data,chunk,counter))

问题是你是map按顺序调用的,所以你一次只运行一个进程。另外,我认为您在map签名pool.map(f(*args))map(f, ['list', 'of', 'data']).

我会使用一个部分,这样你就不会创建副本,data因为我假设数组非常大或将来可能会更大。

这应该是:

import functools
decode_with_data = functools.partial(decode, data)
args = []
for i in range(processors):
    counter = i * chunk_size
    chunk = peaks[1*chunk_size:(i+1)*chunk_size-1]
    args.append(chunk, counter)
pool.map(decode_with_data, args)
于 2013-04-12T08:15:10.400 回答