我遇到了以下问题,我的解决方案是创建一个自定义发射器,在自定义观察器中使用它,然后使用这个观察器。
发射器代码:
class CustomEmitter(PollingEmitter):
def __init__(self, event_queue, watch,
timeout=DEFAULT_EMITTER_TIMEOUT,
stat=default_stat, listdir=os.listdir, logger=None):
super(CustomEmitter, self).__init__(event_queue,
watch,
timeout,
stat,
listdir)
self._take_snapshot = lambda: self._take_safe_snapshot(
self.watch.path, self.watch.is_recursive, stat=stat,
listdir=listdir)
self.logger = logger
self.folder_was_missing = None
def _take_safe_snapshot(self, path, is_recursive, stat, listdir):
try:
snapshot = DirectorySnapshot(path, is_recursive,
stat=stat,
listdir=listdir)
if self.folder_was_missing:
self.logger.info("Observer successfully found the missing "
"directory {}.".format(path))
self.folder_was_missing = False
return snapshot
except FileNotFoundError as fe:
if not self.folder_was_missing:
self.folder_was_missing = True
self.logger.info("Observer can't find the directory {}."
"".format(path))
self.logger.exception(fe)
if self.logger.level == logging.DEBUG:
traceback.print_exc()
def queue_events(self, timeout):
# We don't want to hit the disk continuously.
# timeout behaves like an interval for polling emitters.
if self.stopped_event.wait(timeout):
return
with self._lock:
if not self.should_keep_running():
return
# Get event diff between fresh snapshot and previous snapshot.
# Update snapshot.
new_snapshot = self._take_snapshot()
if new_snapshot is None:
return
events = DirectorySnapshotDiff(self._snapshot, new_snapshot)
self._snapshot = new_snapshot
# Files.
for src_path in events.files_deleted:
self.queue_event(FileDeletedEvent(src_path))
for src_path in events.files_modified:
self.queue_event(FileModifiedEvent(src_path))
for src_path in events.files_created:
self.queue_event(FileCreatedEvent(src_path))
for src_path, dest_path in events.files_moved:
self.queue_event(FileMovedEvent(src_path, dest_path))
# Directories.
for src_path in events.dirs_deleted:
self.queue_event(DirDeletedEvent(src_path))
for src_path in events.dirs_modified:
self.queue_event(DirModifiedEvent(src_path))
for src_path in events.dirs_created:
self.queue_event(DirCreatedEvent(src_path))
for src_path, dest_path in events.dirs_moved:
self.queue_event(DirMovedEvent(src_path, dest_path))
观察者代码:
class TagIDObserver(BaseObserver):
def __init__(self, logger, timeout=DEFAULT_OBSERVER_TIMEOUT):
super(TagIDObserver, self).__init__(emitter_class=TagIDEmitter,
timeout=timeout)
self.logger = logger
def schedule(self, event_handler, path, recursive=False):
"""
Schedules watching a path and calls appropriate methods specified
in the given event handler in response to file system events.
:param event_handler:
An event handler instance that has appropriate event handling
methods which will be called by the observer in response to
file system events.
:type event_handler:
:class:`watchdog.events.FileSystemEventHandler` or a subclass
:param path:
Directory path that will be monitored.
:type path:
``str``
:param recursive:
``True`` if events will be emitted for sub-directories
traversed recursively; ``False`` otherwise.
:type recursive:
``bool``
:return:
An :class:`ObservedWatch` object instance representing
a watch.
"""
with self._lock:
watch = ObservedWatch(path, recursive)
self._add_handler_for_watch(event_handler, watch)
# If we don't have an emitter for this watch already, create it.
if self._emitter_for_watch.get(watch) is None:
emitter = self._emitter_class(event_queue=self.event_queue,
watch=watch,
timeout=self.timeout,
logger=self.logger)
self._add_emitter(emitter)
if self.is_alive():
emitter.start()
self._watches.add(watch)
return watch
此外,如您所见,如果您不需要日志记录,您可以将原始时间表保留在 Observer 中并仅覆盖 __ init __