9

当使用 Pool.apply_async 运行大量任务(大参数)时,进程被分配并进入等待状态,等待进程的数量没有限制。这可能最终会吃掉所有内存,如下例所示:

import multiprocessing
import numpy as np

def f(a,b):
    return np.linalg.solve(a,b)

def test():

    p = multiprocessing.Pool()
    for _ in range(1000):
        p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))
    p.close()
    p.join()

if __name__ == '__main__':
    test()

我正在寻找一种限制等待队列的方法,即只有有限数量的等待进程,并且 Pool.apply_async 在等待队列已满时被阻塞。

4

4 回答 4

7

multiprocessing.Pool有一个_taskqueuetype 的成员multiprocessing.Queue,它接受一个可选maxsize参数;maxsize不幸的是,它在没有参数集的情况下构造它。

我建议使用传递给构造函数multiprocessing.Pool的复制粘贴进行子类化。multiprocessing.Pool.__init__maxsize_taskqueue

猴子修补对象(池或队列)也可以,但你必须猴子修补pool._taskqueue._maxsizepool._taskqueue._sem所以它会很脆弱:

pool._taskqueue._maxsize = maxsize
pool._taskqueue._sem = BoundedSemaphore(maxsize)
于 2012-06-15T18:36:37.570 回答
3

等待是否pool._taskqueue超过所需大小:

import multiprocessing
import time

import numpy as np


def f(a,b):
    return np.linalg.solve(a,b)

def test(max_apply_size=100):
    p = multiprocessing.Pool()
    for _ in range(1000):
        p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))

        while p._taskqueue.qsize() > max_apply_size:
            time.sleep(1)

    p.close()
    p.join()

if __name__ == '__main__':
    test()
于 2017-02-25T20:26:13.410 回答
1

这是最佳答案的猴子修补替代方案:

import queue
from multiprocessing.pool import ThreadPool as Pool


class PatchedQueue():
  """
  Wrap stdlib queue and return a Queue(maxsize=...)
  when queue.SimpleQueue is accessed
  """

  def __init__(self, simple_queue_max_size=5000):
    self.simple_max = simple_queue_max_size  

  def __getattr__(self, attr):
    if attr == "SimpleQueue":
      return lambda: queue.Queue(maxsize=self.simple_max)
    return getattr(queue, attr)


class BoundedPool(Pool):
  # Override queue in this scope to use the patcher above
  queue = PatchedQueue()

pool = BoundedPool()
pool.apply_async(print, ("something",))

这在 Python 3.8 中按预期工作,其中多处理池queue.SimpleQueue用于设置任务队列。听起来multiprocessing.Pool自 2.7 以来的实现可能已经改变

于 2021-03-27T19:11:54.463 回答
0

在这种情况下,您可以使用 maxsize 参数添加显式队列并使用queue.put()而不是。pool.apply_async()然后工作进程可以:

for a, b in iter(queue.get, sentinel):
    # process it

如果您想将内存中创建的输入参数/结果的数量限制为大约活动工作进程的数量,那么您可以使用pool.imap*()以下方法:

#!/usr/bin/env python
import multiprocessing
import numpy as np

def f(a_b):
    return np.linalg.solve(*a_b)

def main():
    args = ((np.random.rand(1000,1000), np.random.rand(1000))
            for _ in range(1000))
    p = multiprocessing.Pool()
    for result in p.imap_unordered(f, args, chunksize=1):
        pass
    p.close()
    p.join()

if __name__ == '__main__':
    main()
于 2012-06-15T22:45:45.433 回答