3

我有一个服务器和几个客户端。它们都共享一个任务和结果 multiprocessing.Queue。但是,每当客户端完成任务并将结果放入结果队列时,我希望服务器查看结果,并在此基础上重新排序任务队列。

这当然意味着从任务队列中弹出所有内容并重新添加。在这个重新排序过程中,我希望客户端阻止接触任务队列。我的问题是如何让服务器识别何时将任务添加到结果队列并通过锁定任务队列并在保护队列的同时重新排序来做出反应。不变的是,在客户端获得新任务之前,服务器必须在返回的每个结果之后重新排序。

我想一个简单(但错误)的方法是让 multiprocessing.Value 充当布尔值,每当添加结果时,客户端都会将其翻转为 True,这意味着已添加结果。服务器可以轮询以获取此值,但最终它可能会错过另一个在轮询之间进入并添加另一个结果的客户端。

任何想法表示赞赏。

**“多线程”标签只是因为它与线程非常相似的想法,我认为这里的进程/线程区别并不重要。

4

1 回答 1

1

让我们尝试一些代码 - 有一些进展总比没有好;-) 部分问题是确保如果结果队列中有任何内容,则不会从任务队列中获取任何内容,对吧?所以队列是紧密相连的。这种方法将两个队列都置于锁的保护之下,并使用条件来避免任何轮询:

设置,在服务器中完成。 taskQ, resultQ,taskCond并且resultCond必须传递给客户端进程(lock不需要显式传递 - 它包含在条件中):

import multiprocessing as mp
taskQ = mp.Queue()
resultQ = mp.Queue()
lock = mp.Lock()
# both conditions share lock
taskCond = mp.Condition(lock)
resultCond = mp.Condition(lock)

客户获得任务;所有客户端都使用此功能。请注意,只要结果队列中有内容,任务就不会被消耗:

def get_task():
    taskCond.acquire()
    while taskQ.qsize() == 0 or resultQ.qsize():
        taskCond.wait()
    # resultQ is empty and taskQ has something
    task = taskQ.get()
    taskCond.release()
    return task

客户有结果:

with resultCond:
    resultQ.put(result)
    # only the server waits on resultCond
    resultCond.notify()

服务器循环:

resultCond.acquire()
while True:
    while resultQ.qsize() == 0:
        resultCond.wait()
    # operations on both queues in all clients are blocked now
    # ... drain resultQ, reorder taskQ ...
    taskCond.notify_all()

笔记:

  1. qsize()通常是概率性的,但是因为所有队列操作都是在持有锁时完成的,所以在这种情况下它是可靠的。

  2. 实际上,因为这里所有的队列操作都受到我们自己的锁的保护,所以真的没有必要使用mp.Queues。例如, anmp.Manager().list()也可以工作(任何共享结构)。当您重新安排任务时,也许列表会更容易使用?

  3. 我不太喜欢的部分是:当服务器这样做时taskCond.notify_all(),一些客户端可能正在等待获取新任务,而其他客户端可能正在等待返回新结果。它们可以按任何顺序运行。一旦任何等待返回结果的客户端有机会,所有等待获取任务的客户端都会阻塞,但在此之前任务将被消耗。当然,这里的“问题”是我们不知道在将某些内容实际添加到结果队列之前等待新结果。

对于最后一个,也许将“客户端有结果”代码更改为:

resultQ.put(result)
with resultCond:
    resultCond.notify()

会更好。不确定。它确实使推理变得更加困难,因为所有队列操作都在我们的锁的保护下完成不再是真的。

于 2013-10-10T18:37:14.133 回答