319

很抱歉,我无法用更简单的示例重现错误,而且我的代码太复杂而无法发布。如果我在 IPython shell 而不是常规的 Python 中运行程序,一切都会顺利进行。

我查阅了有关此问题的一些先前注释。它们都是由使用池调用类函数中定义的函数引起的。但这对我来说不是这样。

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我将不胜感激任何帮助。

更新:我腌制的功能是在模块的顶层定义的。虽然它调用了一个包含嵌套函数的函数。即,f()调用具有嵌套函数的g()调用,而我正在调用。, ,都在顶层定义。我用这种模式尝试了更简单的例子,但它确实有效。h()i()pool.apply_async(f)f()g()h()

4

9 回答 9

376

这是可以腌制的清单。特别是,函数只有在模块的顶层定义时才是可挑选的。

这段代码:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

产生的错误几乎与您发布的错误相同:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

问题是这些pool方法都使用 amp.SimpleQueue将任务传递给工作进程。通过的所有东西都mp.SimpleQueue必须是可挑选的,并且foo.work是不可挑选的,因为它没有在模块的顶层定义。

可以通过在顶层定义一个函数来修复它,该函数调用foo.work()

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

请注意,它foo是可挑选的,因为Foo它是在顶层定义的并且 foo.__dict__是可挑选的。

于 2012-01-10T14:54:13.477 回答
122

我会使用pathos.multiprocesssing, 而不是multiprocessing. pathos.multiprocessing是一个multiprocessing使用dill. dill可以在 python 中序列化几乎任何东西,所以你可以并行发送更多。pathosfork 还可以直接使用多个参数函数,因为您需要类方法。

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

在此处获取pathos(如果您愿意,可以dill): https ://github.com/uqfoundation

于 2014-01-25T01:34:08.600 回答
39

当这个问题出现时,multiprocessing一个简单的解决方案就是从 切换PoolThreadPool. 这可以在不更改导入以外的代码的情况下完成 -

from multiprocessing.pool import ThreadPool as Pool

这是因为 ThreadPool 与主线程共享内存,而不是创建一个新进程——这意味着不需要酸洗。

这种方法的缺点是 python 不是处理线程的最佳语言 - 它使用称为全局解释器锁的东西来保持线程安全,这可能会减慢这里的一些用例。但是,如果您主要与其他系统交互(运行 HTTP 命令、与数据库通信、写入文件系统),那么您的代码可能不受 CPU 约束并且不会受到太大影响。事实上,我在编写 HTTP/HTTPS 基准测试时发现,这里使用的线程模型具有更少的开销和延迟,因为创建新进程的开销远高于创建新线程的开销,否则程序只是等待 HTTP回应。

因此,如果您在 python 用户空间中处理大量内容,这可能不是最好的方法。

于 2019-11-17T03:34:32.370 回答
35

正如其他人所说multiprocessing,只能将 Python 对象传输到可以腌制的工作进程。如果你不能按照 unutbu 的描述重新组织你的代码,你可以使用dill扩展的 pickling/unpickling 功能来传输数据(尤其是代码数据),如下所示。

此解决方案只需要安装dill而不需要其他库pathos

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()
于 2014-07-10T09:56:14.773 回答
25

我发现我还可以通过尝试在其上使用探查器,在一段完美工作的代码上准确地生成该错误输出。

请注意,这是在 Windows 上(分支不太优雅)。

我之前在跑步:

python -m profile -o output.pstats <script> 

发现去掉profiling去掉了错误,放置profiling又恢复了。也让我发疯,因为我知道过去的代码可以工作。我正在检查是否有东西更新了 pool.py... 然后有一种下沉的感觉并消除了分析,就是这样。

在这里张贴档案以防其他人遇到它。

于 2012-10-31T04:25:05.103 回答
5
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

如果您在传递给异步作业的模型对象中有任何内置函数,也会出现此错误。

因此,请确保检查传递的模型对象没有内置函数。(在我们的例子中,我们使用模型内部的django-model-utilsFieldTracker()函数来跟踪某个字段)。这是相关 GitHub 问题的链接。

于 2017-05-26T11:11:27.680 回答
4

此解决方案只需要安装 dill 而不需要安装其他库作为 pathos

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

它也适用于 numpy 数组。

于 2015-09-27T12:13:20.830 回答
0

基于@rocksportrocker 解决方案,在发送和接收结果时进行挖掘是有意义的。

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)
于 2019-07-24T20:00:23.453 回答
0

正如@penky Suresh 在这个答案中所建议的那样,不要使用内置关键字。

显然args是处理多处理时的内置关键字


class TTS:
    def __init__(self):
        pass

    def process_and_render_items(self):
        multiprocessing_args = [{"a": "b", "c": "d"}, {"e": "f", "g": "h"}]

        with ProcessPoolExecutor(max_workers=10) as executor:
          # Using args here is fine. 
            future_processes = {
              executor.submit(TTS.process_and_render_item, args)
                for args in multiprocessing_args
            }

            for future in as_completed(future_processes):
                try:
                    data = future.result()
                except Exception as exc:
                    print(f"Generated an exception: {exc}")
                else:
                   print(f"Generated data for comment process: {future}")
 

    # Dont use 'args' here. It seems to be a built-in keyword.
    # Changing 'args' to 'arg' worked for me.
    def process_and_render_item(arg):
        print(arg)
      # This will print {"a": "b", "c": "d"} for the first process
      # and {"e": "f", "g": "h"} for the second process.



PS:制表符/空格可能有点偏离。

于 2021-09-22T16:16:06.000 回答