2

我有一个需要运行的计算列表。我正在使用它们并行化它们

from pathos.multiprocessing import ProcessingPool
pool = ProcessingPool(nodes=7)
values = pool.map(helperFunction, someArgs)

helperFunction确实创建了一个名为 的类Parameters,该类与在同一文件中定义

import otherModule
class Parameters(otherModule.Parameters):
    ...

到目前为止,一切都很好。helperFunction会根据Parameters对象做一些计算,改变它的一些属性,最后使用pickle. 这是进行保存的辅助函数(来自不同的模块)的相关摘录:

import pickle
import hashlib
import os
class cacheHelper():

    def __init__(self, fileName, attr=[], folder='../cache/'):
        self.folder = folder

        if len(attr) > 0:
            attr = self.attrToName(attr)
        else:
            attr = ''
        self.fileNameNaked = fileName
        self.fileName = fileName + attr

    def write(self, objects):
        with open(self.getFile(), 'wb') as output:
            for object in objects:
                pickle.dump(object, output, pickle.HIGHEST_PROTOCOL)

当它到达时pickle.dump(),它会引发一个难以调试的异常,因为调试器不会进入实际面临该异常的工作人员。因此,我在转储发生之前创建了一个断点,并手动输入了该命令。这是输出:

>>> pickle.dump(objects[0], output, pickle.HIGHEST_PROTOCOL)
Traceback (most recent call last):
  File "/usr/local/anaconda2/envs/myenv2/lib/python2.7/site-packages/IPython/core/interactiveshell.py", line 2885, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-1-4d2cbb7c63d1>", line 1, in <module>
    pickle.dump(objects[0], output, pickle.HIGHEST_PROTOCOL)
  File "/usr/local/anaconda2/envs/myenv2/lib/python2.7/pickle.py", line 1376, in dump
    Pickler(file, protocol).dump(obj)
  File "/usr/local/anaconda2/envs/myenv2/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/local/anaconda2/envs/myenv2/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/anaconda2/envs/myenv2/lib/python2.7/pickle.py", line 396, in save_reduce
    save(cls)
  File "/usr/local/anaconda2/envs/myenv2/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/anaconda2/envs/myenv2/lib/python2.7/site-packages/dill/dill.py", line 1203, in save_type
    StockPickler.save_global(pickler, obj)
  File "/usr/local/anaconda2/envs/myenv2/lib/python2.7/pickle.py", line 754, in save_global
    (obj, module, name))
PicklingError: Can't pickle <class '__main__.Parameters'>: it's not found as __main__.Parameters

helperFunction奇怪的是,当我不并行化(即手动循环)时,这不会发生。我很确定我打开的是正确的Parameters(而不是父类)。

我知道如果没有可重现的示例,很难调试事物,我不希望这部分有任何解决方案。也许更普遍的问题是:

pickle.dump()并行化通过另一个模块使用的代码时需要注意什么?

4

1 回答 1

2

直接来自 Python文档

12.1.4. 什么可以腌制和不腌制?可以腌制以下类型:

  • 无、真和假
  • 整数、浮点数、复数
  • 字符串、字节、字节数组
  • 元组、列表、集合和
  • 仅包含在模块顶层定义的可提取对象函数的字典(使用 def,而不是 lambda)
  • 在模块顶层定义的内置函数
  • 在模块顶层定义的类
  • 这些类的实例,其__dict__或调用的结果__getstate__()是可腌制的(有关详细信息,请参阅腌制类实例部分)。

其他的都不能腌制。就您而言,尽管鉴于您的代码摘录很难说,但我认为问题在于该类Parameters未在模块的顶层定义,因此无法腌制其实例。

使用pathos.multiprocessing(或其积极开发的 fork multiprocess)而不是 built-in的全部意义multiprocessing在于避免pickle,因为有太多东西后来无法转储。pathos.multiprocessingmultiprocess使用dill而不是pickle. 如果你想调试一个工人,你可以使用trace

注意正如 Mike McKerns( 的主要贡献者multiprocess)正确地注意到的那样,有些情况甚至dill无法处理,尽管很难就此问题制定一些通用规则。

于 2016-07-08T18:56:50.407 回答