8

我有以下代码。

def main():
  (minI, maxI, iStep, minJ, maxJ, jStep, a, b, numProcessors) = sys.argv
  for i in range(minI, maxI, iStep):
    for j in range(minJ, maxJ, jStep): 
      p = multiprocessing.Process(target=functionA, args=(minI, minJ))
      p.start()
      def functionB((a, b)):
        subprocess.call('program1 %s %s %s %s %s %s' %(c, a, b, 'file1', 
          'file2', 'file3'), shell=True)
        for d in ['a', 'b', 'c']:
          subprocess.call('program2 %s %s %s %s %s' %(d, 'file4', 'file5', 
            'file6', 'file7'), shell=True)
      abProduct = list(itertools.product(range(0, 10), range(0, 10)))
      pool = multiprocessing.Pool(processes=numProcessors)
      pool.map(functionB, abProduct) 

它产生以下错误。

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.6/threading.py", line 484, in run 
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.6/multiprocessing/pool.py", line 255, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function fa
iled

functionA 的内容不重要,不会产生错误。当我尝试映射函数 B 时,似乎发生了错误。如何消除此错误,在 Python 2.6 中并行化此代码的最佳方法是什么?

4

1 回答 1

18

您最有可能看到此行为的原因是您定义池、对象和函数的顺序。multiprocessing与使用线程并不完全相同。每个进程都会生成并加载环境的副本。如果您在进程可能无法使用的范围内创建函数,或者在池之前创建对象,则池将失败。

首先,尝试在大循环之前创建一个池:

(minI, maxI, iStep, minJ, maxJ, jStep, a, b, numProcessors) = sys.argv
pool = multiprocessing.Pool(processes=numProcessors)
for i in range(minI, maxI, iStep):
    ...

然后,将您的目标可调用对象移到动态循环之外:

def functionB(a, b):
    ...

def main():
    ...

考虑这个例子......

破碎的

import multiprocessing

def broken():
    vals = [1,2,3]

    def test(x):
        return x

    pool = multiprocessing.Pool()
    output = pool.map(test, vals)
    print output

broken()
# PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

在职的

import multiprocessing

def test(x):
    return x

def working():
    vals = [1,2,3]

    pool = multiprocessing.Pool()
    output = pool.map(test, vals)
    print output

working()
# [1, 2, 3]
于 2012-07-02T03:42:33.263 回答