让我们尝试一些代码 - 有一些进展总比没有好;-) 部分问题是确保如果结果队列中有任何内容,则不会从任务队列中获取任何内容,对吧?所以队列是紧密相连的。这种方法将两个队列都置于锁的保护之下,并使用条件来避免任何轮询:
设置,在服务器中完成。 taskQ
, resultQ
,taskCond
并且resultCond
必须传递给客户端进程(lock
不需要显式传递 - 它包含在条件中):
import multiprocessing as mp
taskQ = mp.Queue()
resultQ = mp.Queue()
lock = mp.Lock()
# both conditions share lock
taskCond = mp.Condition(lock)
resultCond = mp.Condition(lock)
客户获得任务;所有客户端都使用此功能。请注意,只要结果队列中有内容,任务就不会被消耗:
def get_task():
taskCond.acquire()
while taskQ.qsize() == 0 or resultQ.qsize():
taskCond.wait()
# resultQ is empty and taskQ has something
task = taskQ.get()
taskCond.release()
return task
客户有结果:
with resultCond:
resultQ.put(result)
# only the server waits on resultCond
resultCond.notify()
服务器循环:
resultCond.acquire()
while True:
while resultQ.qsize() == 0:
resultCond.wait()
# operations on both queues in all clients are blocked now
# ... drain resultQ, reorder taskQ ...
taskCond.notify_all()
笔记:
qsize()
通常是概率性的,但是因为所有队列操作都是在持有锁时完成的,所以在这种情况下它是可靠的。
实际上,因为这里所有的队列操作都受到我们自己的锁的保护,所以真的没有必要使用mp.Queue
s。例如, anmp.Manager().list()
也可以工作(任何共享结构)。当您重新安排任务时,也许列表会更容易使用?
我不太喜欢的部分是:当服务器这样做时taskCond.notify_all()
,一些客户端可能正在等待获取新任务,而其他客户端可能正在等待返回新结果。它们可以按任何顺序运行。一旦任何等待返回结果的客户端有机会,所有等待获取任务的客户端都会阻塞,但在此之前任务将被消耗。当然,这里的“问题”是我们不知道在将某些内容实际添加到结果队列之前等待新结果。
对于最后一个,也许将“客户端有结果”代码更改为:
resultQ.put(result)
with resultCond:
resultCond.notify()
会更好。不确定。它确实使推理变得更加困难,因为所有队列操作都在我们的锁的保护下完成不再是真的。