我需要暂停功能,所以我使用以下观察者:
import time
import contextlib
import watchdog.observers
class PausingObserver(watchdog.observers.Observer):
def dispatch_events(self, *args, **kwargs):
if not getattr(self, '_is_paused', False):
super(PausingObserver, self).dispatch_events(*args, **kwargs)
def pause(self):
self._is_paused = True
def resume(self):
time.sleep(self.timeout) # allow interim events to be queued
self.event_queue.queue.clear()
self._is_paused = False
@contextlib.contextmanager
def ignore_events(self):
self.pause()
yield
self.resume()
然后我可以直接使用pause()
andresume()
方法暂停观察者,但我的主要用例是当我只想忽略由写入我正在观看的目录引起的任何事件时,我使用上下文管理器:
import os
import datetime
import watchdog.events
class MyHandler(watchdog.events.FileSystemEventHandler):
def on_modified(self, event):
with OBSERVER.ignore_events():
with open('./watchdir/modifications.log', 'a') as f:
f.write(datetime.datetime.now().strftime("%H:%M:%S") + '\n')
if __name__ == '__main__':
watchdir = 'watchdir'
if not os.path.exists(watchdir):
os.makedirs(watchdir)
OBSERVER = PausingObserver()
OBSERVER.schedule(MyHandler(), watchdir, recursive=True)
OBSERVER.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
OBSERVER.stop()
OBSERVER.join()
您可以通过将两个代码块保存在一个文件中、运行它并在创建的“watchdir”目录中添加/编辑/删除文件来测试这一点。您修改的时间戳将附加到“watchdir/modifications.log”。
此功能似乎内置于 pyinotify中,但该库仅适用于 linux 并且看门狗独立于操作系统。