12

我已经阅读了很多关于共享数组的关于 SO 的问题,对于简单的数组来说似乎很简单,但我一直试图让它适用于我拥有的数组。

import numpy as np
data=np.zeros(250,dtype='float32, (250000,2)float32')

我尝试通过尝试以某种方式mp.Array接受将其转换为共享数组data,我还尝试使用 ctypes 来创建数组:

import multiprocessing as mp
data=mp.Array('c_float, (250000)c_float',250)

我设法让我的代码正常工作的唯一方法不是将数据传递给函数,而是传递一个编码字符串以进行解压缩/解码,但这最终会导致 n (字符串数)进程被调用,这似乎是多余的。我想要的实现是基于将二进制字符串列表切成 x (进程数)并将这个块传递给除了在本地修改之外工作dataindex进程data,因此关于如何使其共享的问题,任何示例工作使用自定义(嵌套)numpy 数组已经很有帮助。

PS:这个问题是Python 多处理的后续问题

4

2 回答 2

11

请注意,您可以从复杂 dtype 的数组开始:

In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32')

并将其视为同质 dtype 的数组:

In [5]: data2 = data.view('float32')

稍后,将其转换回复杂的 dtype:

In [7]: data3 = data2.view('float32, (250000,2)float32')

更改 dtype 是一个非常快速的操作;它不会影响基础数据,只会影响 NumPy 解释它的方式。所以改变 dtype 几乎没有成本。

因此,您所阅读的有关具有简单(同质)dtype 的数组的内容可以很容易地通过上述技巧应用于您的复杂 dtype。


下面的代码从JF Sebastian 的答案中借用了许多想法,here 。

import numpy as np
import multiprocessing as mp
import contextlib
import ctypes
import struct
import base64


def decode(arg):
    chunk, counter = arg
    print len(chunk), counter
    for x in chunk:
        peak_counter = 0
        data_buff = base64.b64decode(x)
        buff_size = len(data_buff) / 4
        unpack_format = ">%dL" % buff_size
        index = 0
        for y in struct.unpack(unpack_format, data_buff):
            buff1 = struct.pack("I", y)
            buff2 = struct.unpack("f", buff1)[0]
            with shared_arr.get_lock():
                data = tonumpyarray(shared_arr).view(
                    [('f0', '<f4'), ('f1', '<f4', (250000, 2))])
                if (index % 2 == 0):
                    data[counter][1][peak_counter][0] = float(buff2)
                else:
                    data[counter][1][peak_counter][1] = float(buff2)
                    peak_counter += 1
            index += 1
        counter += 1


def pool_init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_  # must be inherited, not passed as an argument


def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())


def numpy_array(shared_arr, peaks):
    """Fills the NumPy array 'data' with m/z-intensity values acquired
    from b64 decoding and unpacking the binary string read from the
    mzXML file, which is stored in the list 'peaks'.

    The m/z values are assumed to be ordered without validating this
    assumption.

    Note: This function uses multi-processing
    """
    processors = mp.cpu_count()
    with contextlib.closing(mp.Pool(processes=processors,
                                    initializer=pool_init,
                                    initargs=(shared_arr, ))) as pool:
        chunk_size = int(len(peaks) / processors)
        map_parameters = []
        for i in range(processors):
            counter = i * chunk_size
            # WARNING: I removed -1 from (i + 1)*chunk_size, since the right
            # index is non-inclusive. 
            chunk = peaks[i*chunk_size : (i + 1)*chunk_size]
            map_parameters.append((chunk, counter))
        pool.map(decode, map_parameters)

if __name__ == '__main__':
    shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250)
    peaks = ...
    numpy_array(shared_arr, peaks)

如果你能保证执行任务的各个进程

if (index % 2 == 0):
    data[counter][1][peak_counter][0] = float(buff2)
else:
    data[counter][1][peak_counter][1] = float(buff2)

永远不要竞争更改相同位置的数据,那么我相信您实际上可以放弃使用锁

with shared_arr.get_lock():

但是我对您的代码的理解还不够好,无法确定,因此为了安全起见,我将锁包括在内。

于 2013-04-12T17:59:16.660 回答
0
from multiprocessing import Process, Array
import numpy as np
import time
import ctypes

def fun(a):
    a[0] = -a[0]
    while 1:
        time.sleep(2)
        #print bytearray(a.get_obj())
        c=np.frombuffer(a.get_obj(),dtype=np.float32)
        c.shape=3,3
        print 'haha',c


def main():
    a = np.random.rand(3,3).astype(np.float32)
    a.shape=1*a.size
    #a=np.array([[1,3,4],[4,5,6]])
    #b=bytearray(a)
    h=Array(ctypes.c_float,a)
    print "Originally,",h

    # Create, start, and finish the child process
    p = Process(target=fun, args=(h,))
    p.start()
    #p.join()
    a.shape=3,3
    # Print out the changed values
    print 'first',a
    time.sleep(3)
    #h[0]=h[0]+1
    print 'main',np.frombuffer(h.get_obj(), dtype=np.float32)



if __name__=="__main__":
    main()
于 2015-04-20T09:42:11.693 回答