16

我有一个线程将结果写入队列。

在另一个线程(GUI)中,我定期(在 IDLE 事件中)检查队列中是否有结果,如下所示:

def queue_get_all(q):
    items = []
    while 1:
        try:
            items.append(q.get_nowait())
        except Empty, e:
            break
    return items

这是一个好方法吗?

编辑:

我问是因为有时等待线程会卡住几秒钟而没有取出新结果。

“卡住”的问题原来是因为我在空闲事件处理程序中进行处理,而没有确保这些事件实际上是通过调用生成的wx.WakeUpIdle,如建议的那样。

4

6 回答 6

19

如果您总是将所有可用的项目从队列中拉出,那么使用队列而不只是一个带锁的列表有什么真正意义吗?IE:

from __future__ import with_statement
import threading

class ItemStore(object):
    def __init__(self):
        self.lock = threading.Lock()
        self.items = []

    def add(self, item):
        with self.lock:
            self.items.append(item)

    def getAll(self):
        with self.lock:
            items, self.items = self.items, []
        return items

如果您还单独拉取它们,并利用空队列的阻塞行为,那么您应该使用 Queue,但您的用例看起来要简单得多,上述方法可能会更好地服务。

[Edit2] 我错过了您从空闲循环轮询队列的事实,并且从您的更新中,我发现问题与争用无关,因此以下方法与您的问题并不真正相关. 我把它留了下来,以防有人发现这个有用的阻塞变体:

对于您确实希望在获得至少一个结果之前阻塞的情况,您可以修改上述代码以通过生产者线程发出信号来等待数据可用。例如。

class ItemStore(object):
    def __init__(self):
        self.cond = threading.Condition()
        self.items = []

    def add(self, item):
        with self.cond:
            self.items.append(item)
            self.cond.notify() # Wake 1 thread waiting on cond (if any)

    def getAll(self, blocking=False):
        with self.cond:
            # If blocking is true, always return at least 1 item
            while blocking and len(self.items) == 0:
                self.cond.wait()
            items, self.items = self.items, []
        return items
于 2008-10-01T07:25:30.087 回答
14

我认为将所有项目从队列中取出的最简单方法如下:

def get_all_queue_result(queue):

    result_list = []
    while not queue.empty():
        result_list.append(queue.get())

    return result_list
于 2014-09-10T14:36:49.580 回答
9

get_nowait()如果列表为空时调用不返回而导致暂停,我会感到非常惊讶。

可能是您在检查之间发布了大量(可能很大?)项目,这意味着接收线程有大量数据要从中提取Queue?您可以尝试限制一批检索的数量:

def queue_get_all(q):
    items = []
    maxItemsToRetrieve = 10
    for numOfItemsRetrieved in range(0, maxItemsToRetrieve):
        try:
            if numOfItemsRetrieved == maxItemsToRetrieve:
                break
            items.append(q.get_nowait())
        except Empty, e:
            break
    return items

这将限制接收线程一次最多拉出 10 个项目。

于 2008-10-01T08:45:02.640 回答
4

最简单的方法是使用列表推导:

items = [q.get() for _ in range(q.qsize())]

range功能的使用通常不受欢迎,但我还没有找到更简单的方法。

于 2019-04-04T15:05:01.380 回答
2

如果您已完成对队列的写入,则 qsize 应该可以解决问题,而无需检查每次迭代的队列。

responseList = []
for items in range(0, q.qsize()):
    responseList.append(q.get_nowait())
于 2015-02-17T21:05:44.517 回答
1

我看到您正在使用 get_nowait() 根据文档,“如果一个项目立即可用,则返回 [s] 一个项目,否则引发空异常”

现在,当抛出 Empty 异常时,您碰巧跳出循环。因此,如果队列中没有立即可用的结果,您的函数将返回一个空项目列表。

您是否有理由不使用 get() 方法?可能会出现 get_nowait() 失败的情况,因为队列在同一时刻正在为 put() 请求提供服务。

于 2008-10-01T06:12:36.267 回答