无法腌制 a Pool
,因此如果您希望工作人员能够添加任务,则必须找到解决方法。
您可以使用特定的“哨兵”返回值,告诉主程序将新任务添加到Pool
:
while True:
ret_value = queue.get()
if is_sentinel(ret_value):
pool.apply_asynch(*get_arguments(ret_value))
yield ret_value
每当is_sentinel
返回True
值要求您Pool
向.get_arguments
Pool
此类功能的最简单实现可能是:
def is_sentinel(value):
"""Assume only sentinel values are tuples, or sequences."""
return isinstance(value, tuple)
# or, using duck-typing
try:
len(value)
except TypeError:
return False
else:
return True
def get_arguments(value):
"""Assume that the sentinel value *is* the tuple of arguments
to pass to the Pool.
"""
return value
# or return value[1:] if you want to include a "normal" return value
传递给的函数仅在要添加新任务时才apply_asynch
返回tuple
(或序列),在这种情况下,它不提供任何返回值。添加还提供返回值的可能性非常简单(例如:元组的第一个元素可能是“正常”返回值)。
另一种方法可能是使用第二个队列,工作人员可以在其中放置他们的请求。在每次迭代中,您可以使用该get_nowait()
方法查看工作人员是否请求在队列中添加更多作业。
您使用第一种方法的示例:
def is_sentinel(value):
return isinstance(value, tuple)
def get_arguments(value):
return value
def integers(queue, n1, n2):
print("integers(%d)" % n1)
queue.put(n1)
if n1 < n2:
queue.put((integers, (queue, n1+1, n2)))
def start():
pool = multiprocessing.Pool()
queue = multiprocessing.Queue()
m = 0
n = 100
pool.apply_asynch(integers, (queue, m, n))
while True:
ret_val = queue.get()
if is_sentinel(ret_val):
pool.apply_asynch(*get_arguments(ret_val))
else:
yield ret_val
您使用第二种方法的示例:
# needed for queue.Empty exception
import queue
def integers(out_queue, task_queue, n1, n2):
print("integers(%d)" % n1)
out_queue.put(n1)
if n1 < n2:
task_queue.put((n1+1, n2))
def start():
pool = multiprocessing.Pool()
out_queue = multiprocessing.Queue()
task_queue = multiprocessing.Queue()
task_queue.put((0, 100))
while True:
try:
# may be safer to use a timeout...
m, n = task_queue.get_nowait()
pool.apply_asynch(integers, (out_queue, task_queue, m, n))
except queue.Empty:
# no task to perform
pass
yield out_queue.get()