1

我目前正在使用 fsevents 监视文件夹。每次添加文件时,都会在此文件上执行代码。每隔一秒就会有一个新文件添加到文件夹中。

from fsevents import Observer, Stream

def file_event_callback(event):
    # code 256 for adding file to folder
    if event.mask == 256:
        fileChanged = event.name
        # do stuff with fileChanged file

if __name__ == "__main__":
    observer = Observer()   
    observer.start()
    stream = Stream(file_event_callback, 'folder', file_events=True)
    observer.schedule(stream)
    observer.join()

这工作得很好。唯一的问题是,该库正在为添加到文件夹的每个文件构建一个队列。在 file_event_callback 中执行的代码可能需要一秒钟以上的时间。当发生这种情况时,应该跳过队列中的其他项目,以便只使用最新的项目。

如何从队列中跳过项目,以便仅在最后一个完成后使用的文件夹中添加最新的内容?

我首先尝试使用看门狗,但由于它必须在 Mac 上运行,我在让它按我想要的方式工作时遇到了一些麻烦。

4

1 回答 1

0

我不知道你使用的是什么库,当你说“这是建立一个队列......”我不知道你指的是什么“这个”......但一个明显的答案是坚持你自己的队列在它使用的任何东西的前面,所以你可以直接操纵那个队列。例如:

import queue
import threading

def skip_get(q):
    value = q.get(block=True)
    try:
        while True:
            value = q.get(block=False)
    except queue.Empty:
        return value

q = queue.Queue()

def file_event_callback(event):
    # code 256 for adding file to folder
    if event.mask == 256:
        fileChanged = event.name
        q.put(fileChanged)

def consumer():
    while True:
        fileChanged = skip_get(q)
        if fileChanged is None:
            return
        # do stuff with fileChanged

现在,在启动观察者之前,请执行以下操作:

t = threading.Thread(target=consumer)
t.start()

最后:

observer.join()
q.put(None)
t.join()

那么,这是如何工作的呢?

首先,让我们看看消费者方面。当您调用 时q.get(),这会将第一件事从队列中弹出。但如果什么都没有呢?这就是block争论的目的。如果为假,get则会引发queue.Empty异常。如果它是真的,get将永远等待(以线程安全的方式),直到出现某些东西。因此,通过阻塞一次,我们可以处理还没有什么可读取的情况。然后在没有阻塞的情况下循环,我们消耗队列中的任何其他内容,以处理有太多要读取的内容的情况。因为我们不断地重新分配value我们弹出的任何东西,所以我们最终得到的是放在队列中的最后一个东西。

现在,让我们看看生产者方面。当你打电话时q.put(value),那只是value排队。除非你对队列设置了大小限制(我没有),否则这不可能阻塞,所以你不必担心任何这些。但是现在,你如何向消费者线程发出你已经完成的信号?它将q.get(block=True)永远等待;唤醒它的唯一方法是给它一些弹出的价值。通过推送一个哨兵值(在这种情况下,None这很好,因为它作为文件名无效),并让消费者None通过退出来处理它,我们为自己提供了一种很好、干净的关闭方式。(而且因为我们在 之后从不推任何东西None,所以不会意外跳过它。)所以,我们可以只推None,然后确保(除非有任何其他错误)消费者线程最终会退出,这意味着我们可以t.join()等到它退出,而不必担心死锁。


我在上面提到过,您可以更简单地使用Condition. 如果您考虑队列的实际工作方式,它只是一个受条件保护的列表(或双端队列,或其他):消费者等待条件直到有可用的东西,生产者通过将其添加到列表中来提供可用的东西发出条件的信号。如果您只想要最后一个值,那么列表真的没有理由。所以,你可以这样做:

class OneQueue(object):
    def __init__(self):
        self.value = None
        self.condition = threading.Condition()
        self.sentinel = object()
    def get(self):
        with self.condition:
            while self.value is None:
                self.condition.wait()
            value, self.value = self.value, None
            return value
    def put(self, value):
        with self.condition:
            self.value = value
            self.condition.notify()
    def close(self):
        self.put(self.sentinel)

(因为我现在None用来表示没有可用的东西,所以我必须创建一个单独的哨兵来表示我们已经完成了。)

这种设计的问题在于,如果生产者放置了多个值,而消费者太忙而无法处理它们,它可能会错过其中的一些——但在这种情况下,这个“问题”正是您要寻找的。

尽管如此,使用较低级别的工具总是意味着有更多的错误,这对于线程同步尤其危险,因为它涉及的问题很难理解,即使你理解它们也很难调试,所以Queue无论如何,您可能会更好。

于 2014-09-23T06:08:20.507 回答