17

可能重复:
如何并行化一个简单的 python 循环?

我对 Python 很陌生(使用 Python 3.2),我有一个关于并行化的问题。我有一个 for 循环,我希望在 Python 3.2 中使用“多处理”并行执行:

def computation:    
    global output

    for x in range(i,j):
        localResult = ... #perform some computation as a function of i and j
        output.append(localResult)

总的来说,我想在 i=0 到 j=100 的范围内执行此计算。因此,我想创建许多进程,每个进程都使用总范围的子域调用函数“计算”。关于如何做到这一点的任何想法?有没有比使用多处理更好的方法?

更具体地说,我想执行域分解,我有以下代码:

from multiprocessing import Pool

class testModule:

    def __init__(self):
        self

    def computation(self, args):
        start, end = args
        print('start: ', start, ' end: ', end)

testMod = testModule()
length = 100
np=4
p = Pool(processes=np)
p.map(yes tMod.computation, [(length, startPosition, length//np) for startPosition in    range(0, length, length//np)]) 

我收到一条提到 PicklingError 的错误消息。有什么想法可能是这里的问题吗?

4

2 回答 2

20

Joblib专为简单的并行循环而设计用于环绕多处理。我建议使用它而不是直接处理多处理。

简单的案例看起来像这样:

from joblib import Parallel, delayed
Parallel(n_jobs=2)(delayed(foo)(i**2) for i in range(10))  # n_jobs = number of processes

一旦你理解它的语法很简单。我们正在使用生成器语法,其中delayed用于调用函数foo,其参数包含在后面的括号中。

在您的情况下,您应该使用生成器语法重写您的 for 循环,或者定义另一个函数(即“worker”函数)来执行单个循环迭代的操作并将其放入调用 Parallel 的生成器语法中。

在后一种情况下,您将执行以下操作:

Parallel(n_jobs=2)(delayed(foo)(parameters) for x in range(i,j))

wherefoo是您定义的用于处理 for 循环主体的函数。请注意,您不想追加到列表,因为 Parallel 无论如何都会返回一个列表。

于 2012-07-24T13:17:24.380 回答
6

在这种情况下,您可能希望定义一个简单的函数来执行计算并获取localResult.

def getLocalResult(args):
    """ Do whatever you want in this func.  
        The point is that it takes x,i,j and 
        returns localResult
    """
    x,i,j = args  #unpack args
    return doSomething(x,i,j)

现在在您的计算函数中,您只需创建一个工人池并映射本地结果:

import multiprocessing
def computation(np=4):
    """ np is number of processes to fork """
    p = multiprocessing.Pool(np)
    output = p.map(getLocalResults, [(x,i,j) for x in range(i,j)] )
    return output

我在这里删除了全局,因为它是不必要的(全局通常是不必要的)。在您的呼叫例程中,您应该只做output.extend(computation(np=4))或类似的事情。

编辑

这是您的代码的“工作”示例:

from multiprocessing import Pool

def computation(args):
    length, startPosition, npoints = args
    print(args)

length = 100
np=4
p = Pool(processes=np)
p.map(computation, [(startPosition,startPosition+length//np, length//np) for startPosition in  range(0, length, length//np)])

请注意,您没有工作,因为您使用实例方法作为您的函数。multiprocessing 启动新进程并通过 发送进程之间的信息pickle,因此,只能使用可以腌制的对象。请注意,无论如何使用实例方法确实没有意义。每个进程都是父进程的副本,因此进程中发生的任何状态更改都不会传播回父进程。

于 2012-07-24T13:18:35.147 回答