我不知道你使用的是什么库,当你说“这是建立一个队列......”我不知道你指的是什么“这个”......但一个明显的答案是坚持你自己的队列在它使用的任何东西的前面,所以你可以直接操纵那个队列。例如:
import queue
import threading
def skip_get(q):
value = q.get(block=True)
try:
while True:
value = q.get(block=False)
except queue.Empty:
return value
q = queue.Queue()
def file_event_callback(event):
# code 256 for adding file to folder
if event.mask == 256:
fileChanged = event.name
q.put(fileChanged)
def consumer():
while True:
fileChanged = skip_get(q)
if fileChanged is None:
return
# do stuff with fileChanged
现在,在启动观察者之前,请执行以下操作:
t = threading.Thread(target=consumer)
t.start()
最后:
observer.join()
q.put(None)
t.join()
那么,这是如何工作的呢?
首先,让我们看看消费者方面。当您调用 时q.get()
,这会将第一件事从队列中弹出。但如果什么都没有呢?这就是block
争论的目的。如果为假,get
则会引发queue.Empty
异常。如果它是真的,get
将永远等待(以线程安全的方式),直到出现某些东西。因此,通过阻塞一次,我们可以处理还没有什么可读取的情况。然后在没有阻塞的情况下循环,我们消耗队列中的任何其他内容,以处理有太多要读取的内容的情况。因为我们不断地重新分配value
我们弹出的任何东西,所以我们最终得到的是放在队列中的最后一个东西。
现在,让我们看看生产者方面。当你打电话时q.put(value)
,那只是value
排队。除非你对队列设置了大小限制(我没有),否则这不可能阻塞,所以你不必担心任何这些。但是现在,你如何向消费者线程发出你已经完成的信号?它将q.get(block=True)
永远等待;唤醒它的唯一方法是给它一些弹出的价值。通过推送一个哨兵值(在这种情况下,None
这很好,因为它作为文件名无效),并让消费者None
通过退出来处理它,我们为自己提供了一种很好、干净的关闭方式。(而且因为我们在 之后从不推任何东西None
,所以不会意外跳过它。)所以,我们可以只推None
,然后确保(除非有任何其他错误)消费者线程最终会退出,这意味着我们可以t.join()
等到它退出,而不必担心死锁。
我在上面提到过,您可以更简单地使用Condition
. 如果您考虑队列的实际工作方式,它只是一个受条件保护的列表(或双端队列,或其他):消费者等待条件直到有可用的东西,生产者通过将其添加到列表中来提供可用的东西发出条件的信号。如果您只想要最后一个值,那么列表真的没有理由。所以,你可以这样做:
class OneQueue(object):
def __init__(self):
self.value = None
self.condition = threading.Condition()
self.sentinel = object()
def get(self):
with self.condition:
while self.value is None:
self.condition.wait()
value, self.value = self.value, None
return value
def put(self, value):
with self.condition:
self.value = value
self.condition.notify()
def close(self):
self.put(self.sentinel)
(因为我现在None
用来表示没有可用的东西,所以我必须创建一个单独的哨兵来表示我们已经完成了。)
这种设计的问题在于,如果生产者放置了多个值,而消费者太忙而无法处理它们,它可能会错过其中的一些——但在这种情况下,这个“问题”正是您要寻找的。
尽管如此,使用较低级别的工具总是意味着有更多的错误,这对于线程同步尤其危险,因为它涉及的问题很难理解,即使你理解它们也很难调试,所以Queue
无论如何,您可能会更好。