1

假设我有一个由multiprocessing.Pool. 如何允许此任务向Pool执行它添加新任务?例如,

def integers(pool, queue, n1, n2):
  print ("integers(%d)" % n)
  queue.put(n)
  pool.apply_async(integers, (pool, queue, n+1))  # crashes; can't pickle `pool`

def start():
  pool  = multiprocessing.Pool()
  queue = multiprocessing.Queue()
  integers(pool, queue, 1)
  while True:
    yield queue.get()
4

1 回答 1

2

无法腌制 a Pool,因此如果您希望工作人员能够添加任务,则必须找到解决方法。

您可以使用特定的“哨兵”返回值,告诉主程序将新任务添加到Pool

while True:
    ret_value = queue.get()
    if is_sentinel(ret_value):
        pool.apply_asynch(*get_arguments(ret_value))
    yield ret_value

每当is_sentinel返回True值要求您Pool向.get_argumentsPool

此类功能的最简单实现可能是:

def is_sentinel(value):
    """Assume only sentinel values are tuples, or sequences."""
    return isinstance(value, tuple)
    # or, using duck-typing
    try:
        len(value)
    except TypeError:
        return False
    else:
        return True


def get_arguments(value):
    """Assume that the sentinel value *is* the tuple of arguments
    to pass to the Pool.
    """
    return value
    # or return value[1:] if you want to include a "normal" return value

传递给的函数仅在要添加新任务时才apply_asynch返回tuple(或序列),在这种情况下,它不提供任何返回值。添加还提供返回值的可能性非常简单(例如:元组的第一个元素可能是“正常”返回值)。

另一种方法可能是使用第二个队列,工作人员可以在其中放置他们的请求。在每次迭代中,您可以使用该get_nowait()方法查看工作人员是否请求在队列中添加更多作业。


您使用第一种方法的示例:

def is_sentinel(value):
    return isinstance(value, tuple)

def get_arguments(value):
    return value


def integers(queue, n1, n2):
    print("integers(%d)" % n1)
    queue.put(n1)
    if n1 < n2:
        queue.put((integers, (queue, n1+1, n2)))

def start():
    pool = multiprocessing.Pool()
    queue = multiprocessing.Queue()
    m = 0
    n = 100
    pool.apply_asynch(integers, (queue, m, n))
    while True:
        ret_val = queue.get()
        if is_sentinel(ret_val):
            pool.apply_asynch(*get_arguments(ret_val))
        else:
            yield ret_val

您使用第二种方法的示例:

# needed for queue.Empty exception
import queue

def integers(out_queue, task_queue, n1, n2):
    print("integers(%d)" % n1)
    out_queue.put(n1)
    if n1 < n2:
        task_queue.put((n1+1, n2))


def start():
    pool = multiprocessing.Pool()
    out_queue = multiprocessing.Queue()
    task_queue = multiprocessing.Queue()
    task_queue.put((0, 100))
    while True:
        try:
            # may be safer to use a timeout...
            m, n = task_queue.get_nowait()
            pool.apply_asynch(integers, (out_queue, task_queue, m, n))
        except queue.Empty:
            # no task to perform
            pass
        yield out_queue.get()
于 2013-09-08T07:40:22.490 回答