1

我正在使用 Python 2.7multiprocessing.Pool来管理一个由 3 个工作人员组成的池。每个工作人员都相当复杂,并且在某些第三方代码中(可能)存在资源泄漏,导致连续运行 6-8 小时后出现问题。所以我想用maxtasksperchild定期刷新工人。

我还希望每个工作人员都写入自己单独的日志文件。没有maxtasksperchild我使用 sharedmultiprocessing.Value为每个 worker 分配一个整数(0、1 或 2),然后使用该整数来命名日志文件。

一旦工作人员完成,maxtasksperchild我想重用日志文件。因此,如果整个事情运行一个月,我只需要三个日志文件,而不是每个生成的工人的一个日志文件。

如果我可以传递一个回调(例如finalizer,与当前支持的一起使用initializer),这将是直截了当的。没有它,我看不到一个强大而简单的方法来做到这一点。

4

2 回答 2

1

这是 AFAIK 未记录的,但multiprocessing有一个Finalizer类,“它支持使用弱引用进行对象终结”。您可以使用它在您的initializer.

不过,在这种情况下,我看不到multiprocessing.Value有用的同步选择。多个工作人员可以同时退出,表明哪些文件整数是空闲的,这比(锁定的)计数器所能提供的要多。

我建议使用多个 bare multiprocessing.Locks,每个文件一个,而不是:

from multiprocessing import Pool, Lock, current_process
from multiprocessing.util import Finalize


def f(n):
    global fileno
    for _ in range(int(n)):  # xrange for Python 2
        pass
    return fileno


def init_fileno(file_locks):
    for i, lock in enumerate(file_locks):
        if lock.acquire(False):  # non-blocking attempt
            globals()['fileno'] = i
            print("{} using fileno: {}".format(current_process().name, i))
            Finalize(lock, lock.release, exitpriority=15)
            break


if __name__ == '__main__':

    n_proc = 3
    file_locks = [Lock() for _ in range(n_proc)]

    pool = Pool(
        n_proc, initializer=init_fileno, initargs=(file_locks,),
        maxtasksperchild=2
    )

    print(pool.map(func=f, iterable=[50e6] * 18))
    pool.close()
    pool.join()
    # all locks should be available if all finalizers did run
    assert all(lock.acquire(False) for lock in file_locks)

输出:

ForkPoolWorker-1 using fileno: 0
ForkPoolWorker-2 using fileno: 1
ForkPoolWorker-3 using fileno: 2
ForkPoolWorker-4 using fileno: 0
ForkPoolWorker-5 using fileno: 1
ForkPoolWorker-6 using fileno: 2
[0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2]

Process finished with exit code 0

请注意,在 Python 3 中,您不能可靠地使用 Pool 的上下文管理器,而不是使用上面显示的旧方法。Pool 的上下文管理器(不幸的是)调用,这可能会在终结器有机会运行之前terminate()杀死工作进程。

于 2020-01-15T22:07:19.360 回答
0

我最终选择了以下内容。它假设 PID 不会很快被回收(对我来说在 Ubuntu 上是这样,但在 Unix 上一般不会)。我认为它没有做任何其他假设,但我真的只是对 Ubuntu 感兴趣,所以我没有仔细研究 Windows 等其他平台。

该代码使用一个数组来跟踪哪些 PID 声明了哪个索引。然后,当一个新的 worker 启动时,它会查看是否有任何 PID 不再使用。如果它找到一个,它假定这是因为工人已经完成了它的工作(或因其他原因被终止)。如果它没有找到一个,那么我们就不走运了!所以这并不完美,但我认为它比我迄今为止看到或考虑过的任何东西都简单。

def run_pool():
    child_pids = Array('i', 3)
    pool = Pool(3, initializser=init_worker, initargs=(child_pids,), maxtasksperchild=1000)

def init_worker(child_pids):
    with child_pids.get_lock():
        available_index = None
        for index, pid in enumerate(child_pids):
            # PID 0 means unallocated (this happens when our pool is started), we reclaim PIDs
            # which are no longer in use. We also reclaim the lucky case where a PID was recycled
            # but assigned to one of our workers again, so we know we can take it over
            if not pid or not _is_pid_in_use(pid) or pid == os.getpid():
                available_index = index
                break

        if available_index is not None:
            child_pids[available_index] = os.getpid()
        else:
            # This is unexpected - it means all of the PIDs are in use so we have a logical error
            # or a PID was recycled before we could notice and reclaim its index
            pass

def _is_pid_in_use(pid):
    try:
        os.kill(pid, 0)
        return True
    except OSError:
        return False
于 2020-01-16T18:14:14.420 回答