24

Suppose you are using a multiprocessing.Pool object, and you are using the initializer setting of the constructor to pass an initializer function that then creates a resource in the global namespace. Assume resource has a context manager. How would you handle the life-cycle of the context managed resource provided it has to live through the life of the process, but be properly cleaned up at the end?

So far, I have something somewhat like this:

resource_cm = None
resource = None


def _worker_init(args):
    global resource
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()

From here on, the pool processes can use the resource. So far so good. But handling clean up is a bit trickier, since the multiprocessing.Pool class does not provide a destructor or deinitializer argument.

One of my ideas is to use the atexit module, and register the clean up in the initializer. Something like this:

def _worker_init(args):
    global resource
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()

    def _clean_up():
        resource_cm.__exit__()

    import atexit
    atexit.register(_clean_up)

Is this a good approach? Is there an easier way of doing this?

EDIT: atexit does not seem to work. At least not in the way I am using it above, so as of right now I still do not have a solution for this problem.

4

3 回答 3

38

首先,这是一个非常好的问题!在multiprocessing代码中挖掘了一下之后,我想我已经找到了一种方法:

当您启动 amultiprocessing.Pool时,该对象在内部为池的每个成员Pool创建一个对象。multiprocessing.Process当这些子进程启动时,它们会调用一个_bootstrap函数,如下所示:

def _bootstrap(self):
    from . import util
    global _current_process
    try:
        # ... (stuff we don't care about)
        util._finalizer_registry.clear()
        util._run_after_forkers()
        util.info('child process calling self.run()')
        try:
            self.run()
            exitcode = 0 
        finally:
            util._exit_function()
        # ... (more stuff we don't care about)

run方法是实际运行target您提供的Process对象的方法。对于Pool具有长时间运行的 while 循环的方法的进程,该循环等待工作项通过内部队列进入。对我们来说真正有趣的是:被调用之后发生的事情。 self.runutil._exit_function()

事实证明,该函数做了一些清理工作,听起来很像您正在寻找的内容:

def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
                   active_children=active_children,
                   current_process=current_process):
    # NB: we hold on to references to functions in the arglist due to the
    # situation described below, where this function is called after this
    # module's globals are destroyed.

    global _exiting

    info('process shutting down')
    debug('running all "atexit" finalizers with priority >= 0')  # Very interesting!
    _run_finalizers(0)

这是的文档字符串_run_finalizers

def _run_finalizers(minpriority=None):
    '''
    Run all finalizers whose exit priority is not None and at least minpriority

    Finalizers with highest priority are called first; finalizers with
    the same priority will be called in reverse order of creation.
    '''

该方法实际上通过终结器回调列表运行并执行它们:

items = [x for x in _finalizer_registry.items() if f(x)]
items.sort(reverse=True)

for key, finalizer in items:
    sub_debug('calling %s', finalizer)
    try:
        finalizer()
    except Exception:
        import traceback
        traceback.print_exc()

完美的。那么我们如何进入_finalizer_registry呢?Finalize调用了一个未记录的对象multiprocessing.util,负责向注册表添加回调:

class Finalize(object):
    '''
    Class which supports object finalization using weakrefs
    '''
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
        assert exitpriority is None or type(exitpriority) is int

        if obj is not None:
            self._weakref = weakref.ref(obj, self)
        else:
            assert exitpriority is not None

        self._callback = callback
        self._args = args
        self._kwargs = kwargs or {}
        self._key = (exitpriority, _finalizer_counter.next())
        self._pid = os.getpid()

        _finalizer_registry[self._key] = self  # That's what we're looking for!

好的,所以把它们放在一个例子中:

import multiprocessing
from multiprocessing.util import Finalize

resource_cm = None
resource = None

class Resource(object):
    def __init__(self, args):
        self.args = args

    def __enter__(self):
        print("in __enter__ of %s" % multiprocessing.current_process())
        return self

    def __exit__(self, *args, **kwargs):
        print("in __exit__ of %s" % multiprocessing.current_process())

def open_resource(args):
    return Resource(args)

def _worker_init(args):
    global resource
    print("calling init")
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()
    # Register a finalizer
    Finalize(resource, resource.__exit__, exitpriority=16)

def hi(*args):
    print("we're in the worker")

if __name__ == "__main__":
    pool = multiprocessing.Pool(initializer=_worker_init, initargs=("abc",))
    pool.map(hi, range(pool._processes))
    pool.close()
    pool.join()

输出:

calling init
in __enter__ of <Process(PoolWorker-1, started daemon)>
calling init
calling init
in __enter__ of <Process(PoolWorker-2, started daemon)>
in __enter__ of <Process(PoolWorker-3, started daemon)>
calling init
in __enter__ of <Process(PoolWorker-4, started daemon)>
we're in the worker
we're in the worker
we're in the worker
we're in the worker
in __exit__ of <Process(PoolWorker-1, started daemon)>
in __exit__ of <Process(PoolWorker-2, started daemon)>
in __exit__ of <Process(PoolWorker-3, started daemon)>
in __exit__ of <Process(PoolWorker-4, started daemon)>

正如你所看到__exit__的,当我们join()游泳池时,我们所有的工人都会被召唤。

于 2014-07-13T15:48:30.933 回答
4

您可以继承Process并覆盖其run()方法,以便它在退出前执行清理。然后你应该子类Pool化,以便它使用你的子类化进程:

from multiprocessing import Process
from multiprocessing.pool import Pool

class SafeProcess(Process):
    """ Process that will cleanup before exit """
    def run(self, *args, **kw):
        result = super().run(*args, **kw)
        # cleanup however you want here
        return result


class SafePool(Pool):
    Process = SafeProcess


pool = SafePool(4)  # use it as standard Pool
于 2018-07-27T11:18:51.603 回答
0

这是我想出的解决方案。它使用台球,这是 Python 多处理包的一个分支。此解决方案需要使用私有 API Worker._ensure_messages_consumed,因此我建议在生产中使用此解决方案。我只是需要这个来做一个附带项目,所以这对我来说已经足够了。使用它需要您自担风险。

from billiard import pool
from billiard.pool import Pool, Worker

class SafeWorker(Worker):
    # this function is called just before a worker process exits
    def _ensure_messages_consumed(self, *args, **kwargs):
        # Not necessary, but you can move `Pool.initializer` logic here if you want.
        out = super()._ensure_messages_consumed(*args, **kwargs)
        # Do clean up work here
        return out

class SafePool(Pool):
    Worker = SafeWorker

我尝试的另一个解决方案是将我的清理逻辑实现为信号处理程序,但这不起作用,因为两者都multiprocessing用于billiard杀死exit()他们的工作进程。我不确定如何atexit工作,但这可能是该方法也不起作用的原因。

于 2022-01-04T00:33:05.677 回答