36

我正在尝试使用 IPython 的并行环境,到目前为止,它看起来很棒,但我遇到了问题。假设我有一个函数,在库中定义

def func(a,b):
   ...

当我想评估 a 的一个值和 b 的一堆值时,我会使用它。

[func(myA, b) for b in myLongList]

显然,真正的函数更复杂,但问题的本质是它需要多个参数,我只想映射其中一个。问题是 map、@dview.parallel 等映射所有参数。

所以假设我想得到 func(myA, myLongList) 的答案。这样做的明显方法是使用 functools.partial 或就像

dview.map_sync(lambda b: func(myA, b),   myLongList)

但是,这在远程计算机上无法正常工作。原因是当 lambda 表达式被腌制时,不包括 myA 的值,而是使用远程机器上本地范围的 myA 的值。当闭包被腌制时,它们关闭的变量不会。

我能想到的两种实际可行的方法是为每个参数手动构造列表,并对所有参数进行映射,

dview.map_sync(func, [myA]*len(myLongList), myLongList)   

或者可怕地使用数据作为函数的默认参数,迫使它被腌制:

# Can't use a lambda here b/c lambdas don't use default arguments :(
def parallelFunc(b, myA = myA):
    return func(myA, b)

dview.map_sync(parallelFunc, myLongList)

真的,当真正的函数需要很多参数并且更复杂时,这一切似乎都被扭曲了。有一些惯用的方法吗?就像是

@parallel(mapOver='b')
def  bigLongFn(a, b):
   ...

但据我所知,不存在像“mapOver”这样的东西。我可能对如何实现它有一个想法......这只是一个非常基本的操作,应该存在支持,所以我想检查我是否遗漏了一些东西。

4

5 回答 5

15

我可以对 batu 的答案进行一些改进(我认为这是一个很好的答案,但可能没有详细记录你为什么使用这些选项)。ipython 文档目前在这一点上也严重不足。所以你的功能是这样的:

def myfxn(a,b,c,d):
  ....
  return z

并存储在名为mylib的文件中。假设 b、c 和 d 在您的运行过程中是相同的,因此您编写了一个 lambda 函数以将其简化为 1 参数函数。

import mylib
mylamfxn=lambda a:mylib.myfxn(a,b,c,d)

你想运行:

z=dview.map_sync(mylamfxn, iterable_of_a)

在梦境中,一切都会如此神奇地运作。但是,首先您会收到“未找到 mylib”的错误,因为 ipcluster 进程尚未加载 mylib。如有必要,请确保 ipcluster 进程在其 python 路径中具有“mylib”并且位于 myfxn 的正确工作目录中。然后你需要添加到你的python代码:

dview.execute('import mylib')

import mylib它在每个进程上运行命令。如果你再试一次,你会得到一个“全局变量 b 未定义”的错误,因为当变量在你的 python 会话中时,它们不在 ipcluster 进程中。但是,python 提供了一种将一组变量复制到子进程的方法。继续上面的例子:

mydict=dict(b=b, c=c, d=d)
dview.push(mydict)

现在所有子进程都可以访问 b、c 和 d。然后你可以运行:

z=dview.map_sync(mylamfxn, iterable_of_a)

它现在应该像宣传的那样工作。无论如何,我是使用 python 进行并行计算的新手,发现这个线程很有用,所以我想我会尝试帮助解释一些让我有点困惑的点......

最终代码将是:

import mylib

#set up parallel processes, start ipcluster from command line prior!
from IPython.parallel import Client
rc=Client()
dview=rc[:]

#...do stuff to get iterable_of_a and b,c,d....

mylamfxn=lambda a:mylib.myfxn(a,b,c,d)

dview.execute('import mylib')
mydict=dict(b=b, c=c, d=d)
dview.push(mydict)
z=dview.map_sync(mylamfxn, iterable_of_a)

这可能是让几乎所有令人尴尬的并行代码在 python 中并行运行的最快和最简单的方法......

更新您还可以使用 dview 推送所有数据而无需循环,然后使用 lview (即lview=rc.load_balanced_view(); lview.map(...)以负载平衡方式进行实际计算。

于 2013-07-24T23:45:25.123 回答
6

这是我给 StackOverflow 的第一条消息,所以请保持温和;)我试图做同样的事情,并想出了以下内容。我很确定这不是最有效的方法,但似乎有点工作。现在需要注意的一个问题是,出于某种原因,我只看到两台发动机以 100% 的速度工作,其他的几乎处于闲置状态......

为了在 map 中调用多 arg 函数,我首先在我的个人 parallel.py 模块中编写了这个例程:

def map(r,func, args=None, modules=None):
"""
Before you run parallel.map, start your cluster (e.g. ipcluster start -n 4)

map(r,func, args=None, modules=None):
args=dict(arg0=arg0,...)
modules='numpy, scipy'    

examples:
func= lambda x: numpy.random.rand()**2.
z=parallel.map(r_[0:1000], func, modules='numpy, numpy.random')
plot(z)

A=ones((1000,1000));
l=range(0,1000)
func=lambda x : A[x,l]**2.
z=parallel.map(r_[0:1000], func, dict(A=A, l=l))
z=array(z)

"""
from IPython.parallel import Client
mec = Client()
mec.clear()
lview=mec.load_balanced_view()
for k in mec.ids:
  mec[k].activate()
  if args is not None:
    mec[k].push(args)
  if modules is not None:
    mec[k].execute('import '+modules)
z=lview.map(func, r)
out=z.get()
return out

如您所见,该函数采用 args 参数,该参数是头节点工作区中的参数字典。然后将这些参数推送到引擎。那时它们成为本地对象,可以直接在函数中使用。例如,在上面注释中给出的最后一个示例中,使用 l 引擎局部变量对 A 矩阵进行切片。

我必须说,即使上述功能有效,我目前也不是 100% 满意。如果我能想出更好的东西,我会在这里发布。

更新:2013/04/11 我对代码做了一些小改动: - 激活语句缺少括号。导致它无法运行。- 将 mec.clear() 移到函数的顶部,而不是末尾。我还注意到如果我在 ipython 中运行它效果最好。例如,如果我使用上述函数作为“python ./myparallelrun.py”运行脚本,但如果我在 ipython 中使用“%run ./myparallelrun.py”运行它,则可能会出错。不知道为什么...

于 2013-04-09T19:47:36.733 回答
0

一种优雅的方法是使用偏函数。

如果你知道你希望 foo 的第一个参数是 myArg,你可以创建一个新的函数 bar

from functools import partial
bar = partial(foo, myARg)

bar(otherArg)然后将返回foo(myArg,otherArg)

于 2012-09-20T19:21:16.397 回答
0

让我们以此为基础:

dview.map_sync(func, [myA]*len(myLongList), myLongList)

也许以下会起作用:

from itertools import izip_longest
dview.map_sync(func, izip_longest(myLongList, [], fillvalue=myA))

例子:

>>> # notice that a is a tuple
... concat = lambda a: '%s %s' % a
>>> mylonglist = range(10)
>>> from itertools import izip_longest
>>> map(concat, izip_longest(mylonglist, [], fillvalue='mississippi'))
['0 mississippi', '1 mississippi', '2 mississippi', '3 mississippi',
'4 mississippi', '5 mississippi', '6 mississippi', '7 mississippi',
'8 mississippi', '9 mississippi']
于 2012-10-13T04:33:53.777 回答
0

我正在发布 Alex S. 评论作为答案。这可能是解决这个问题的正确方法:

只需使用 lambda 进行部分应用。我知道这看起来很奇怪,但是使用 my_f = lambda a,my=other,arguments=go,right=here : f(a,my,arguments,right) 是最简单的方法,不会陷入酸洗和推挤问题.

于 2017-11-11T18:58:28.413 回答