0

我试图用 numpy 并行化一些数据扩展,我发现并行化版本比串行版本花费的时间长几个数量级,所以我一定犯了一些愚蠢的错误。

首先,一些假数据来设置问题:

Ngroups = 1.e6
some_group_property = np.random.uniform(0, 100, Ngroups)
mem1_occupation = np.random.random_integers(0, 5, Ngroups)
mem2_occupation = np.random.random_integers(0, 5, Ngroups)
occupation_list = [mem1_occupation, mem2_occupation]

现在进行串行计算:我将组数据扩展为组成员的数组:

mem1_property = np.repeat(some_group_property, mem1_occupation)
mem2_property = np.repeat(some_group_property, mem2_occupation)

这是并行版本:

import functools
from joblib import Parallel, delayed

def expand_data(prop, occu_list, index):
    return np.repeat(prop, occu_list[index])
exp_data_1argfunc = functools.partial(expand_data, some_group_property, occupation_list)

result = Parallel(n_jobs=2)(delayed(exp_data_1argfunc)(i) for i in range(len(occupation_list)))  

我在 4 核机器上运行此代码,因此原则上为两个群体独立执行计算应该会给我大约 2 倍的加速。相反,串行计算需要约 0.1 秒,而并行计算需要 9 秒。这里发生了什么?

4

1 回答 1

2

首先:

我在 4 核机器上运行此代码,因此原则上为两个群体独立执行计算应该会给我大约 2 倍的加速。

不。一般来说,与线程数成线性比例的加速将是绝对的最佳情况,假设:

  1. 决定执行代码需要多长时间的限制因素是所需的 CPU 时钟周期数(即不是磁盘 I/O、内存分配等)
  2. 您的代码中没有任何部分无法并行化
  3. 并行化不会产生额外的开销

在实践中,这些标准永远不会完全满足,所以你绝对不应该自动假设线性加速是可能的。

functools.partial话虽如此,您的并行示例可以通过摆脱中间函数声明来大大加快速度:

def parallel_1(some_group_property, occupation_list):
    """ original version """
    exp_partial = functools.partial(expand_data, some_group_property,
                                    occupation_list)
    return Parallel(n_jobs=2)(
        delayed(exp_partial)(i)
        for i in range(len(occupation_list))
    )


def parallel_2(some_group_property, occupation_list):
    """ get rid of functools.partial """
    return Parallel(n_jobs=2)(
        delayed(expand_1)(some_group_property, occupation_list, i)
        for i in range(len(occupation_list))
    )


In [40]: %timeit parallel_1(some_group_property, occupation_list)
1 loops, best of 3: 7.24 s per loop

In [41]: %timeit parallel_2(some_group_property, occupation_list)
1 loops, best of 3: 375 ms per loop

在 Pythonmultiprocessing中,函数及其参数在发送到工作线程之前被腌制,然后在工作线程中解除腌制并执行函数。我怀疑减速可能与functools.partial更难腌制/解封的物体有关,尽管我真的不确定原因是什么。

您还可以通过仅传递特定线程所需的“职业”数组而不是包含所有线程的列表来做得更好:

def parallel_3(some_group_property, occupation_list):
    """ pass only a single occu array to each worker thread """
    return Parallel(n_jobs=2)(
        delayed(np.repeat)(some_group_property, oo)
        for oo in occupation_list
    )

In [44]: %timeit parallel_3(some_group_property, occupation_list)
1 loops, best of 3: 353 ms per loop

但是,这仍然无法与单线程版本的性能相匹配:

def serial_version(some_group_property, occupation_list):
    return [np.repeat(some_property_group, oo)
            for oo in occupation_list]

In [46]: %timeit serial_version(some_group_property, occupation_list)
10 loops, best of 3: 46.1 ms per loop

这可能只是意味着并行化所涉及的额外开销(启动两个工作线程、腌制/取消选取函数及其参数等)大大超过了并行计算两个数组所带来的性能提升。

我想您可能会在更大的数组中看到并行化的一些好处,其中并行版本在实际执行有用的计算而不是仅仅设置和终止工作线程上花费更多的时间。

于 2015-01-20T02:14:44.713 回答