127

是否可以创建一个非守护进程的 python 池?我希望一个池能够调用一个内部有另一个池的函数。

我想要这个,因为守护进程无法创建进程。具体来说,它会导致错误:

AssertionError: daemonic processes are not allowed to have children

例如,考虑function_a有一个运行的池和运行的池function_b的场景function_c。这个函数链会失败,因为function_b正在一个守护进程中运行,而守护进程无法创建进程。

4

8 回答 8

140

该类multiprocessing.pool.Pool在其方法中创建工作进程,使它们成为守护进程并启动它们,并且在它们启动之前无法将其属性__init__重新设置为(之后不再允许)。但是您可以创建自己的(只是一个包装函数) 的子类并替换您自己的子类,它始终是非守护进程,用于工作进程。daemonFalsemultiprocesing.pool.Poolmultiprocessing.Poolmultiprocessing.Process

这是如何执行此操作的完整示例。重要的部分是两个类NoDaemonProcess,在顶部和最后MyPool调用你的实例。pool.close()pool.join()MyPool

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()
于 2012-01-22T18:46:24.080 回答
45

我有必要在 Python 3.7 中使用非守护程序池,并最终调整了已接受答案中发布的代码。下面是创建非守护程序池的片段:

import multiprocessing.pool

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(NestablePool, self).__init__(*args, **kwargs)

由于当前的实现multiprocessing已被广泛重构为基于上下文,我们需要提供一个NoDaemonContext具有我们的NoDaemonProcessas 属性的类。NestablePool然后将使用该上下文而不是默认上下文。

也就是说,我应该警告这种方法至少有两个警告:

  1. 它仍然取决于multiprocessing包的实现细节,因此可能随时中断。
  2. multiprocessing使用非守护进程变得如此困难是有正当理由的,这里解释了其中的许多。在我看来最有说服力的是:

至于允许子线程使用子进程产生自己的子进程,如果父线程或子线程在子进程完成并返回之前终止,则存在创建一小群僵尸“孙子”的风险。

于 2018-11-06T22:16:26.830 回答
27

multiprocessing模块有一个很好的接口来使用带有进程线程的池。根据您当前的用例,您可能会考虑使用multiprocessing.pool.ThreadPool外部池,这将导致线程(允许从内部生成进程)而不是进程。

它可能受到 GIL 的限制,但在我的特定情况下(我测试了两者) ,此处Pool创建的外部进程的启动时间远远超过了.ThreadPool


真的很容易ProcessesThreads在此处此处阅读有关如何使用ThreadPool解决方案的更多信息。

于 2015-12-03T14:50:21.207 回答
20

从 Python 3.8 开始,concurrent.futures.ProcessPoolExecutor没有这个限制。它可以有一个完全没有问题的嵌套进程池:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

上面的演示代码是用 Python 3.8 测试的。

然而,一个限制ProcessPoolExecutor是它没有maxtasksperchild. 如果您需要这个,请考虑Massimiliano 的答案

学分:jfs 的回答

于 2020-04-27T23:57:55.467 回答
7

在某些 Python 版本中,将标准 Pool 替换为 custom 会引发错误:AssertionError: group argument must be None for now.

在这里,我找到了一个可以提供帮助的解决方案:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc
于 2019-01-22T08:34:05.440 回答
4

我遇到的问题是尝试在模块之间导入全局变量,导致 ProcessPool() 行被多次评估。

全局变量.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
     
    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

然后从代码中的其他地方安全导入

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         

我在pathos.multiprocessing这里写了一个更扩展的包装类:

附带说明一下,如果您的用例只需要异步多进程映射作为性能优化,那么 joblib 将在后台管理您的所有进程池并允许使用这种非常简单的语法:

squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )
于 2017-11-24T20:50:01.223 回答
3

我见过人们通过使用称为billiardcelery的分支(多处理池扩展)来处理这个问题,它允许守护进程产生子进程。解决方法是通过以下方式简单地替换模块:multiprocessingmultiprocessing

import billiard as multiprocessing
于 2020-05-08T09:52:16.693 回答
1

当错误似乎是误报时,这提供了一种解决方法。正如James 所指出的,这可能发生在从守护进程无意导入的情况下。

例如,如果你有以下简单的代码,WORKER_POOL可能会无意中从一个worker中导入,从而导致错误。

import multiprocessing

WORKER_POOL = multiprocessing.Pool()

一种简单但可靠的解决方法是:

import multiprocessing
import multiprocessing.pool


class MyClass:

    @property
    def worker_pool(self) -> multiprocessing.pool.Pool:
        # Ref: https://stackoverflow.com/a/63984747/
        try:
            return self._worker_pool  # type: ignore
        except AttributeError:
            # pylint: disable=protected-access
            self.__class__._worker_pool = multiprocessing.Pool()  # type: ignore
            return self.__class__._worker_pool  # type: ignore
            # pylint: enable=protected-access

在上面的解决方法中,MyClass.worker_pool可以使用没有错误。如果您认为这种方法可以改进,请告诉我。

于 2020-09-21T00:07:24.227 回答