11

这应该是我的第三个也是最后一个问题,关于我尝试提高我使用 python 进行的一些统计分析的性能。我的代码有 2 个版本(单核与多处理),我希望通过使用多核来获得性能,因为我希望我的代码可以解压缩/解压缩很多二进制字符串,遗憾的是我注意到性能实际上是通过使用多个核心。

我想知道是否有人对我观察到的情况有可能的解释(向下滚动到 4 月 16 日更新以获取更多信息)?

程序的关键部分是函数 numpy_array (+ 多处理中的解码),下面的代码片段(可通过 pastebin 访问的完整代码,进一步如下):

def numpy_array(data, peaks):
    rt_counter=0
    for x in peaks:
        if rt_counter %(len(peaks)/20) == 0:
            update_progress()
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[rt_counter][1][peak_counter][0]=float(buff2)
            else:
                data[rt_counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        rt_counter+=1

多处理版本通过一组函数执行此操作,我将在下面显示关键 2:

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def numpy_array(shared_arr,peaks):
    processors=mp.cpu_count()
    with contextlib.closing(mp.Pool(processes=processors,
                                    initializer=pool_init,
                                    initargs=(shared_arr, ))) as pool:
        chunk_size=int(len(peaks)/processors)
        map_parameters=[]
        for i in range(processors):
            counter = i*chunk_size
            chunk=peaks[i*chunk_size:(i+1)*chunk_size]
            map_parameters.append((chunk, counter))
        pool.map(decode,map_parameters)

def decode ((chunk, counter)):
    data=tonumpyarray(shared_arr).view(
        [('f0','<f4'), ('f1','<f4',(250000,2))])
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            #with shared_arr.get_lock():
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        counter+=1

可以通过这些 pastebin 链接访问完整的程序代码

Pastebin 单核版本

用于多处理版本的 Pastebin

我使用包含 239 个时间点和每个时间点约 180k 测量对的文件观察到的性能对于单核约为 2.5m,对于多处理约为 3.5。

PS:前两个问题(我第一次尝试并行化):

  1. Python 多处理
  2. 让我的 NumPy 数组跨进程共享

——4月16日——

我一直在使用 cProfile 库对我的程序进行分析(cProfile.run('main()') 在 中__main__,这表明有一个步骤会减慢一切:

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
23   85.859    3.733   85.859    3.733 {method 'acquire' of 'thread.lock' objects}

我在这里不明白的是thread.lock对象用于threading(据我了解)但不应该用于多处理,因为每个核心都应该运行一个线程(除了拥有自己的锁定机制),那么这是怎么发生的为什么一次通话需要 3.7 秒?

4

3 回答 3

2

共享数据是由于同步而导致速度下降的已知情况。

您可以在进程之间拆分数据,或者给每个进程一个独立的副本吗?然后,您的流程将不需要同步任何内容,直到所有计算完成为止。

然后我会让主进程将所有工作处理器的输出连接到一个连贯的集合中。

这种方法可能需要额外的 RAM,但现在 RAM 很便宜。

如果你问,我也对每个线程锁获取 3700 毫秒感到困惑。OTOH 分析可能会误认为这样的特殊调用。

于 2013-04-19T01:41:58.170 回答
0

至于您问题的最后一部分,Python 文档基本上说 multiprocessing.lock 是 threading.lock 的克隆。Acquire 对锁的调用可能需要很长时间,因为如果锁已经被获取,它将阻塞直到锁被释放。当多个进程竞争访问相同的数据时,这可能会成为一个问题,就像在您的代码中一样。因为我无法查看您的 pastebin,我只能猜测到底发生了什么,但很可能,您的进程正在长时间获取锁,这会阻止其他进程运行,即使有很多空闲 CPU 时间。这不应该受到 GIL 的影响,因为它应该只限制多线程应用程序,而不是多处理应用程序。所以,如何解决这个问题?我的猜测是,您有某种锁来保护您的共享数组,该数组在进程执行需要相对较长时间的密集计算时保持锁定,因此禁止访问其他进程,这些进程随后会阻塞其 lock.acquire()来电。假设您有足够的 RAM,我强烈支持建议在每个进程的地址空间中存储数组的多个副本的答案。但是,请注意,通过 map 传递大型数据结构可能会导致意想不到的瓶颈,因为它需要挑选和去酸洗。我强烈支持建议在每个进程的地址空间中存储数组的多个副本的答案。但是,请注意,通过 map 传递大型数据结构可能会导致意想不到的瓶颈,因为它需要挑选和去酸洗。我强烈支持建议在每个进程的地址空间中存储数组的多个副本的答案。但是,请注意,通过 map 传递大型数据结构可能会导致意想不到的瓶颈,因为它需要挑选和去酸洗。

于 2013-07-01T03:58:17.603 回答
0

您的 Pastebin 是空的。

问题是多处理使用 fork 如果它可用(而不是产生一个新的 python 进程)。分叉的进程共享相同的环境(例如文件描述符)。可能其中有一些锁。

这是一些令人沮丧的事情:多处理或os.fork,os.exec?

于 2013-06-15T13:16:58.410 回答