8

在处理作为类的数据成员的函数时(由于 Pickling 问题),我知道有关多处理模块限制的各种 讨论

但是在多处理中是否有另一个模块或任何类型的解决方法,它允许类似以下的东西(特别是不强制并行应用函数的定义以存在于类之外)?

class MyClass():

    def __init__(self):
        self.my_args = [1,2,3,4]
        self.output  = {}

    def my_single_function(self, arg):
        return arg**2

    def my_parallelized_function(self):
        # Use map or map_async to map my_single_function onto the
        # list of self.my_args, and append the return values into
        # self.output, using each arg in my_args as the key.

        # The result should make self.output become
        # {1:1, 2:4, 3:9, 4:16}


foo = MyClass()
foo.my_parallelized_function()
print foo.output

注意:我可以很容易地做到这一点,方法是移出my_single_function类,并将类似的东西传递foo.my_argsmapormap_async命令。但这会将函数的并行执行推到MyClass.

对于我的应用程序(并行一个大型数据查询,该查询检索、连接和清理每月的数据横截面,然后将它们附加到此类横截面的长时间序列中),在类,因为我的程序的不同用户将实例化具有不同时间间隔、不同时间增量、要收集的不同数据子集等的类的不同实例,这些都应该与该实例相关联。

因此,我希望实例也可以完成并行化的工作,因为它拥有与并行化查询相关的所有数据,并且尝试编写一些绑定到某些参数并存在于外部的 hacky 包装函数将是愚蠢的类(特别是因为这样的函数是非通用的。它需要类内部的各种细节。)

4

3 回答 3

8

Steven Bethard发布了一种方法,允许对方法进行腌制/取消腌制。你可以像这样使用它:

import multiprocessing as mp
import copy_reg
import types

def _pickle_method(method):
    # Author: Steven Bethard
    # http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    cls_name = ''
    if func_name.startswith('__') and not func_name.endswith('__'):
        cls_name = cls.__name__.lstrip('_')
    if cls_name:
        func_name = '_' + cls_name + func_name
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    # Author: Steven Bethard
    # http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

# This call to copy_reg.pickle allows you to pass methods as the first arg to
# mp.Pool methods. If you comment out this line, `pool.map(self.foo, ...)` results in
# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
# __builtin__.instancemethod failed

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class MyClass(object):

    def __init__(self):
        self.my_args = [1,2,3,4]
        self.output  = {}

    def my_single_function(self, arg):
        return arg**2

    def my_parallelized_function(self):
        # Use map or map_async to map my_single_function onto the
        # list of self.my_args, and append the return values into
        # self.output, using each arg in my_args as the key.

        # The result should make self.output become
        # {1:1, 2:4, 3:9, 4:16}
        self.output = dict(zip(self.my_args,
                               pool.map(self.my_single_function, self.my_args)))

然后

pool = mp.Pool()   
foo = MyClass()
foo.my_parallelized_function()

产量

print foo.output
# {1: 1, 2: 4, 3: 9, 4: 16}
于 2012-07-30T18:07:59.043 回答
5

multiprocessing如果你使用被调用的分支pathos.multiprocesssing,你可以直接在多处理的map函数中使用类和类方法。这是因为dillis 用来代替pickleor cPickle,并且dill可以在 python 中序列化几乎任何东西。

pathos.multiprocessing还提供了一个异步映射函数……它可以map使用多个参数(例如map(math.pow, [1,2,3], [4,5,6])

请参阅: 多处理和莳萝可以一起做什么?

和: http: //matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

所以你可以做你想做的事,我相信。

Python 2.7.8 (default, Jul 13 2014, 02:29:54) 
[GCC 4.2.1 Compatible Apple Clang 4.1 ((tags/Apple/clang-421.11.66))] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import dill
>>> 
>>> class MyClass():
...   def __init__(self):
...     self.my_args = [1,2,3,4]
...     self.output = {}
...   def my_single_function(self, arg):
...     return arg**2
...   def my_parallelized_function(self):
...     res = p.map(self.my_single_function, self.my_args)
...     self.output = dict(zip(self.my_args, res))
... 
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool()
>>> 
>>> foo = MyClass()
>>> foo.my_parallelized_function()
>>> foo.output
{1: 1, 2: 4, 3: 9, 4: 16}
>>>

在此处获取代码: https ://github.com/uqfoundation/pathos

于 2014-01-30T04:31:46.150 回答
1

我相信有一个更好的优雅解决方案。将以下行添加到对类进行多处理的代码中,您仍然可以通过池传递方法。代码应该高于班级

import copy_reg
    import types

    def _reduce_method(meth):
        return (getattr,(meth.__self__,meth.__func__.__name__))
    copy_reg.pickle(types.MethodType,_reduce_method)

有关如何腌制方法的更多了解,请参见下面的 http://docs.python.org/2/library/copy_reg.html

于 2013-11-08T14:36:12.997 回答