57

我有一个相当复杂的 Python 对象,需要在多个进程之间共享。我使用multiprocessing.Process. 当我在其中共享一个对象时multiprocessing.Queuemultiprocessing.Pipe它们共享得很好。但是当我尝试与其他非多处理模块对象共享一个对象时,似乎 Python 分叉了这些对象。真的吗?

我尝试使用 multiprocessing.Value。但我不确定应该是什么类型?我的对象类称为 MyClass。但是当我尝试时multiprocess.Value(MyClass, instance),它失败了:

TypeError: this type has no size

知道发生了什么吗?

4

6 回答 6

65

经过大量研究和测试,我发现“Manager”在非复杂对象级别上完成了这项工作。

下面的代码显示对象inst是在进程之间共享的,这意味着当子进程更改它时,它的属性varinst在外部更改。

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class SimpleClass(object):
    def __init__(self):
        self.var = 0

    def set(self, value):
        self.var = value

    def get(self):
        return self.var
        

def change_obj_value(obj):
    obj.set(100)


if __name__ == '__main__':
    BaseManager.register('SimpleClass', SimpleClass)
    manager = BaseManager()
    manager.start()
    inst = manager.SimpleClass()

    p = Process(target=change_obj_value, args=[inst])
    p.start()
    p.join()

    print inst                    # <__main__.SimpleClass object at 0x10cf82350>
    print inst.get()              # 100

好的,如果您只需要共享简单的对象,上面的代码就足够了

为什么不复杂?因为如果您的对象是嵌套的(对象内的对象),它可能会失败:

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class GetSetter(object):
    def __init__(self):
        self.var = None

    def set(self, value):
        self.var = value

    def get(self):
        return self.var
        

class ChildClass(GetSetter):
    pass

class ParentClass(GetSetter):
    def __init__(self):
        self.child = ChildClass()
        GetSetter.__init__(self)

    def getChild(self):
        return self.child


def change_obj_value(obj):
    obj.set(100)
    obj.getChild().set(100)


if __name__ == '__main__':
    BaseManager.register('ParentClass', ParentClass)
    manager = BaseManager()
    manager.start()
    inst2 = manager.ParentClass()

    p2 = Process(target=change_obj_value, args=[inst2])
    p2.start()
    p2.join()

    print inst2                    # <__main__.ParentClass object at 0x10cf82350>
    print inst2.getChild()         # <__main__.ChildClass object at 0x10cf6dc50>
    print inst2.get()              # 100
    #good!

    print inst2.getChild().get()   # None
    #bad! you need to register child class too but there's almost no way to do it
    #even if you did register child class, you may get PicklingError :)

我认为这种行为的主要原因是因为Manager它只是一个建立在管道/队列等低级通信工具之上的直板。

因此,这种方法推荐用于多处理情况。如果您可以使用锁/信号量/管道/队列等低级工具或Redis 队列Redis 发布/订阅等高级工具来处理复杂的用例(仅我的建议,哈哈),那总是更好。

于 2016-09-28T09:07:27.827 回答
41

您可以使用 Python 的multiprocessing管理器”类和您定义的代理类来执行此操作。请参阅Python 文档中的代理对象

您要做的是为您的自定义对象定义一个代理类,然后使用“远程管理器”共享该对象——查看“使用远程管理器”部分中的同一链接文档页面中的示例,其中文档展示如何共享远程队列。您将做同样的事情,但您的调用your_manager_instance.register()将在其参数列表中包含您的自定义代理类。

通过这种方式,您将设置服务器以与自定义代理共享自定义对象。您的客户端需要访问服务器(同样,请参阅优秀的文档示例,了解如何设置客户端/服务器对远程队列的访问,但不是共享 a Queue,而是共享对特定类的访问)。

于 2010-09-13T00:52:11.133 回答
7

这是我为此制作的python包(在进程之间共享复杂对象)。

混帐:https ://github.com/dRoje/pipe-proxy

这个想法是您为您的对象创建一个代理并将其传递给一个进程。然后你使用代理,就像你有对原始对象的引用一样。虽然你只能使用方法调用,所以访问对象变量是通过抛出 setter 和 getter 来完成的。

假设我们有一个名为“example”的对象,创建代理和代理侦听器很容易:

from pipeproxy import proxy 
example = Example() 
exampleProxy, exampleProxyListener = proxy.createProxy(example) 

现在您将代理发送到另一个进程。

p = Process(target=someMethod, args=(exampleProxy,)) p.start()

在其他过程中使用它,就像使用原始对象一样(示例):

def someMethod(exampleProxy):
    ...
    exampleProxy.originalExampleMethod()
    ...

但是你必须在主进程中听它:

exampleProxyListener.listen()

阅读更多并在此处查找示例:

http://matkodjipalo.com/index.php/2017/11/12/proxy-solution-python-multiprocessing/

于 2017-11-13T08:23:54.447 回答
7

在 Python 3.6 中,文档说:

在 3.6 版更改: 共享对象可以嵌套。例如,共享容器对象(如共享列表)可以包含其他共享对象,这些共享对象都将由 SyncManager 管理和同步。

只要通过 SyncManager 创建实例,您就应该能够使对象相互引用。但是,在另一种类型的对象的方法中动态创建一种类型的对象可能仍然是不可能的或非常棘手的。

编辑:我偶然发现了这个问题Multiprocessing manager and custom classes with python 3.6.5 and 3.6.7。需要检查python 3.7

编辑 2:由于其他一些问题,我目前无法使用 python3.7 进行测试。https://stackoverflow.com/a/50878600/7541006中提供的解决方法对我来说很好

于 2019-03-22T15:35:16.887 回答
2

我尝试使用 BaseManager 并注册我的自定义类以使其快乐,并解决关于嵌套类的问题,正如 Tom 上面提到的那样。

我认为主要原因与所说的嵌套类无关,而是python采用低级的通信机制。原因是python使用一些类似套接字的通信机制来同步在低级别的服务器进程中对自定义类的修改。我认为它封装了一些 rpc 方法,使其对用户透明,就好像他们调用了嵌套类对象的本地方法一样。

所以,当你想修改、检索你的自定义对象或一些第三方对象时,你应该在你的进程中定义一些接口来与之通信,而不是直接获取或设置值。

但是在对嵌套对象中的多嵌套对象进行操作时,可以忽略上面提到的问题,就像您在普通例程中所做的那样,因为您在注册类中的嵌套对象不再是代理对象,对其进行操作将不再通过类似套接字的通信例程并且已本地化。

这是我为解决问题而编写的可行代码。

from multiprocessing import Process, Manager, Lock
from multiprocessing.managers import BaseManager
import numpy as np

class NestedObj(object):
       def __init__(self):
                self.val = 1

class CustomObj(object):
        def __init__(self, numpy_obj):
                self.numpy_obj = numpy_obj
                self.nested_obj = NestedObj()

        def set_value(self, p, q, v):
                self.numpy_obj[p, q] = v

        def get_obj(self):
                return self.numpy_obj

        def get_nested_obj(self):
                return self.nested_obj.val

class CustomProcess(Process):
        def __init__(self, obj, p, q, v):
                super(CustomProcess, self).__init__()
                self.obj = obj
                self.index = p, q
                self.v = v

        def run(self):
                self.obj.set_value(*self.index, self.v)



if __name__=="__main__":
        BaseManager.register('CustomObj', CustomObj)
        manager = BaseManager()
        manager.start()
        data = [[0 for x in range(10)] for y in range(10)]
        matrix = np.matrix(data)
        custom_obj = manager.CustomObj(matrix)
        print(custom_obj.get_obj())
        process_list = []
        for p in range(10):
                for q in range(10):
                        proc = CustomProcess(custom_obj, p, q, 10*p+q)
                        process_list.append(proc)
        for x in range(100):
                process_list[x].start()
        for x in range(100):
                process_list[x].join()
        print(custom_obj.get_obj())
        print(custom_obj.get_nested_obj())
于 2018-06-06T05:19:35.010 回答
0

为了避免共享资源的一些麻烦,您可以尝试在由 eg 映射的函数的 return 语句中收集需要访问单例资源的数据,pool.imap_unordered然后在检索部分结果的循环中进一步处理它:

for result in in pool.imap_unordered(process_function, iterable_data):
    do_something(result)

如果返回的数据不多,那么执行此操作可能不会有太多开销。

于 2017-06-11T10:08:08.427 回答