24

我知道有关此主题的几个问题和答案,但尚未找到此特定问题的令人满意的答案:

在通过 numpy/scipy 函数操作 numpy 数组的 python 循环中进行简单的共享内存并行化的最简单方法是什么?

我不是在寻找最有效的方法,我只是想要一些简单的实现,当循环不并行运行时不需要大量重写。就像 OpenMP 在低级语言中实现一样。

我在这方面看到的最佳答案是this one,但这是一种相当笨拙的方式,需要将循环表达为一个接受单个参数的函数,几行共享数组转换 crud,似乎需要从 调用并行函数__main__,从交互式提示(我花了很多时间)似乎不能很好地工作。

凭借 Python 的所有简单性,这真的是并行化循环的最佳方式吗?真的吗?这对于以 OpenMP 方式进行并行化来说是微不足道的。

我煞费苦心地阅读了多处理模块的不透明文档,却发现它是如此通用,以至于它似乎适用于除了简单的循环并行化之外的所有东西。我对设置管理器、代理、管道等不感兴趣。我只有一个简单的循环,完全并行,任务之间没有任何通信。使用 MPI 来并行化这样一个简单的情况似乎有点过头了,更不用说在这种情况下内存效率低下。

我还没有时间了解用于 Python 的众多不同的共享内存并行包,但想知道是否有人在这方面有更多经验并且可以向我展示一种更简单的方法。请不要建议使用 Cython 之类的串行优化技术(我已经使用它),或使用 BLAS 之类的并行 numpy/scipy 函数(我的情况更通用,更并行)。

4

3 回答 3

18

使用 Cython 并行支持:

# asd.pyx
from cython.parallel cimport prange

import numpy as np

def foo():
    cdef int i, j, n

    x = np.zeros((200, 2000), float)

    n = x.shape[0]
    for i in prange(n, nogil=True):
        with gil:
            for j in range(100):
                x[i,:] = np.cos(x[i,:])

    return x

在 2 核机器上:

$ cython asd.pyx
$ gcc -fPIC -fopenmp -shared -o asd.so asd.c -I/usr/include/python2.7
$ export OMP_NUM_THREADS=1
$ time python -c 'import asd; asd.foo()'
real    0m1.548s
user    0m1.442s
sys 0m0.061s

$ export OMP_NUM_THREADS=2
$ time python -c 'import asd; asd.foo()'
real    0m0.602s
user    0m0.826s
sys 0m0.075s

这可以并行运行,因为np.cos(与其他 ufunc 一样)释放了 GIL。

如果您想以交互方式使用它:

# asd.pyxbdl
def make_ext(modname, pyxfilename):
    from distutils.extension import Extension
    return Extension(name=modname,
                     sources=[pyxfilename],
                     extra_link_args=['-fopenmp'],
                     extra_compile_args=['-fopenmp'])

和(删除asd.soasd.c第一):

>>> import pyximport
>>> pyximport.install(reload_support=True)
>>> import asd
>>> q1 = asd.foo()
# Go to an editor and change asd.pyx
>>> reload(asd)
>>> q2 = asd.foo()

所以是的,在某些情况下,您可以仅使用线程进行并行化。OpenMP 只是一个花哨的线程包装器,因此这里只需要 Cython 以实现更简单的语法。如果没有 Cython,您可以使用该threading模块 --- 与多处理类似(并且可能更健壮),但您不需要做任何特殊的事情来将数组声明为共享内存。

然而,并不是所有的操作都会释放 GIL,所以 YMMV 用于性能。

***

从其他 Stackoverflow 答案中提取的另一个可能有用的链接 --- 多处理的另一个接口:http ://packages.python.org/joblib/parallel.html

于 2012-10-26T19:52:48.513 回答
4

使用映射操作(在这种情况下multiprocessing.Pool.map())或多或少是在单台机器上并行化循环的规范方法。除非并且直到内置map()被并行化。

可以在此处找到不同可能性的概述。

您可以将openmp与 python(或者更确切地说 cython)一起使用,但它看起来并不容易。

IIRC,如果只运行多处理的东西__main__是必要的,因为它与 Windows 兼容。由于 windows 缺少fork(),它会启动一个新的 python 解释器,并且必须在其中导入代码。

编辑

Numpy 可以并行化一些操作,例如dot()vdot()以及innerproduct(),当配置了一个良好的多线程 BLAS 库(例如OpenBLAS)时。(另见这个问题。)

由于 numpy 数组操作主要是按元素进行的,因此似乎可以将它们并行化。但这将涉及为 python 对象设置共享内存段,或者将数组分成几部分并将它们提供给不同的进程,这与所做的没什么不同multiprocessing.Pool。无论采用何种方法,管理所有这些都会产生内存和处理开销。必须进行广泛的测试,以查看对于哪些大小的数组,这实际上值得付出努力。这些测试的结果可能会因硬件架构、操作系统和 RAM 数量而有很大差异。

于 2012-10-25T13:03:36.150 回答
0

ParallelRegression中mathDict( ) 类的.map( )方法完全符合您在两行代码中寻找的功能,这在交互式提示下应该很容易。它使用真正的多处理,因此并行运行的函数是可腌制的要求是不可避免的,但这确实提供了一种简单的方法来循环来自多个进程的共享内存中的矩阵。

假设您有一个可腌制的功能:

def sum_row( matrix, row ):
    return( sum( matrix[row,:] ) )

然后你只需要创建一个代表它的 mathDict( ) 对象,并使用 mathDict( ).map( ):

matrix = np.array( [i for i in range( 24 )] ).reshape( (6, 4) )

RA, MD = mathDictMaker.fromMatrix( matrix, integer=True )
res = MD.map( [(i,) for i in range( 6 )], sum_row, ordered=True )

print( res )
# [6, 22, 38, 54, 70, 86]

文档(上面的链接)解释了如何将位置参数和关键字参数的组合传递给您的函数,包括矩阵本身在任何位置或作为关键字参数。这应该使您能够使用您已经编写的几乎任何功能而无需修改它。

于 2017-01-22T11:34:52.600 回答