17

三个问题可能重复(但过于具体):

通过回答这个问题,可以回答所有其他三个问题。希望我说清楚:

一旦我在由多处理创建的某个进程中创建了一个对象:

  1. 如何将该对象的引用传递给其他进程?
  2. (不那么重要)我如何确保在我持有参考时这个过程不会死掉?

示例 1(已解决)

from concurrent.futures import *

def f(v):
    return lambda: v * v

if __name__ == '__main__':
    with ThreadPoolExecutor(1) as e: # works with ThreadPoolExecutor
        l = list(e.map(f, [1,2,3,4]))
    print([g() for g in l]) # [1, 4, 9, 16]

示例 2

假设f返回一个状态可变的对象。这个相同的对象应该可以从其他进程访问。

示例 3

我有一个对象,它有一个打开的文件和一个锁 - 我如何授予对其他进程的访问权限?

提醒

我不希望出现此特定错误。或针对此特定用例的解决方案。该解决方案应该足够通用,以便在进程之间共享不可移动的对象。对象可能在任何进程中创建。使所有对象都可移动并保留身份的解决方案也很好。

欢迎任何提示,任何指向如何实现解决方案的部分解决方案或代码片段都是值得的。所以我们可以一起创造一个解决方案。

这是解决此问题但没有多处理的尝试: https ://github.com/niccokunzmann/pynet/blob/master/documentation/done/tools.rst

问题

您希望其他进程如何处理引用?

引用可以传递给使用多处理创建的任何其他进程(重复 3)。可以访问属性,调用引用。访问的属性可能是也可能不是代理。

仅使用代理有什么问题?

也许没有问题,只有挑战。我的印象是代理有一个管理器,并且一个管理器有自己的进程,因此不可序列化的对象必须被序列化和传输(使用 StacklessPython/fork 部分解决)。还存在特殊对象的代理 - 为所有对象(可解决)构建代理很难但并非不可能。

解决方案?- 代理+经理?

Eric Urban 表明序列化不是问题。真正的挑战在于示例 2 和 3:状态的同步。我对解决方案的想法是为经理创建一个特殊的代理类。这个代理类

  1. 接受不可序列化对象的构造函数
  2. 获取一个可序列化的对象并将其传输到管理器进程。
  3. (问题)根据1.不可序列化对象必须在manager进程中创建。
4

3 回答 3

12

大多数时候,将现有对象的引用传递给另一个进程并不是很理想。相反,您创建要在进程之间共享的类:

class MySharedClass:
    # stuff...

然后你做一个这样的代理管理器:

import multiprocessing.managers as m
class MyManager(m.BaseManager):
    pass # Pass is really enough. Nothing needs to be done here.

然后您在该 Manager 上注册您的课程,如下所示:

MyManager.register("MySharedClass", MySharedClass)

然后,一旦管理器被实例化并启动,manager.start()您就可以使用manager.MySharedClass. 这应该适用于所有需求。返回的代理与原始对象完全一样,除了文档中描述的一些例外情况。

于 2014-03-05T09:59:23.863 回答
5

在阅读此答案之前,请注意其中解释的解决方案很糟糕。请注意答案末尾的警告。

我找到了一种通过multiprocessing.Array. 所以我制作了这个类,它通过所有进程透明地共享它的状态:

import multiprocessing as m
import pickle

class Store:
    pass

class Shareable:
    def __init__(self, size = 2**10):
        object.__setattr__(self, 'store', m.Array('B', size))
        o = Store() # This object will hold all shared values
        s = pickle.dumps(o)
        store(object.__getattribute__(self, 'store'), s)

    def __getattr__(self, name):
        s = load(object.__getattribute__(self, 'store'))
        o = pickle.loads(s)
        return getattr(o, name)

    def __setattr__(self, name, value):
        s = load(object.__getattribute__(self, 'store'))
        o = pickle.loads(s)
        setattr(o, name, value)
        s = pickle.dumps(o)
        store(object.__getattribute__(self, 'store'), s)

def store(arr, s):
    for i, ch in enumerate(s):
        arr[i] = ch

def load(arr):
    l = arr[:]
    return bytes(arr)

您可以将此类(及其子类)的实例传递给任何其他进程,它将通过所有进程同步它的状态。这是用这段代码测试的:

class Foo(Shareable):
    def __init__(self):
        super().__init__()
        self.f = 1

    def foo(self):
        self.f += 1

def f(s):
    s.f += 1

if __name__ == '__main__':
    import multiprocessing as m
    import time
    s = Foo()
    print(s.f)
    p = m.Process(target=f, args=(s,))
    p.start()
    time.sleep(1)
    print(s.f)

此类的“魔力”在于它将所有属性存储在该类的另一个实例中Store。这门课不是很特别。它只是一些可以具有任意属性的类。(一个 dict 也可以。)

然而,这个类有一些非常讨厌的怪癖。我找到了两个。

Store第一个怪癖是您必须指定实例最多占用多少空间。这是因为multiprocessing.Array具有静态大小。所以里面可以腌制的对象只能是数组那么大。

第二个怪癖是您不能将此类与 ProcessPoolExecutors 或简单的 Pools 一起使用。如果你尝试这样做,你会得到一个错误:

>>> s = Foo()
>>> with ProcessPoolExecutor(1) as e:
...     e.submit(f, args=(s,))
... 
<Future at 0xb70fe20c state=running>
Traceback (most recent call last):
<omitted>
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance

警告
您可能不应该使用这种方法,因为它使用了无法控制的内存量,与使用代理相比过于复杂(请参阅我的其他答案),并且可能会以惊人的方式崩溃。

于 2014-03-02T21:03:20.067 回答
3

只需使用无堆栈python。你可以用 序列化几乎任何东西pickle,包括函数。在这里,我lambda使用pickle模块对 a 进行序列化和反序列化。这类似于您在示例中尝试执行的操作。

这是 Stackless Python http://www.stackless.com/wiki/Download的下载链接

Python 2.7.5 Stackless 3.1b3 060516 (default, Sep 23 2013, 20:17:03) 
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> f = 5
>>> g = lambda : f * f
>>> g()
25
>>> import pickle
>>> p = pickle.dumps(g)
>>> m = pickle.loads(p)
>>> m()
25
>>> 
于 2014-02-23T16:08:32.947 回答