4

我很高兴使用看门狗包,特别是PollingObserver来监视目录中的文件事件。它工作得很好 - 直到我正在观看的目录被删除。然后发生的是轮询目录的代码在不存在的目录上调用stat()并引发异常。处理这个问题的最佳方法是什么?我看不到如何捕获此异常,因为它位于单独的线程中。

示例代码:

import sys
import time
import logging
from watchdog.observers.polling import PollingObserver
from watchdog.events import LoggingEventHandler

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s - %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    path = sys.argv[1] if len(sys.argv) > 1 else '.'
    event_handler = LoggingEventHandler()
    observer = PollingObserver()
    print("Watching: ", path)
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
        observer.join()

要明白我的意思,请将现有目录作为参数传递,然后将其删除。

4

1 回答 1

0

我遇到了以下问题,我的解决方案是创建一个自定义发射器,在自定义观察器中使用它,然后使用这个观察器。

发射器代码:

class CustomEmitter(PollingEmitter):

    def __init__(self, event_queue, watch,
                 timeout=DEFAULT_EMITTER_TIMEOUT,
                 stat=default_stat, listdir=os.listdir, logger=None):
        super(CustomEmitter, self).__init__(event_queue,
                                            watch,
                                            timeout,
                                            stat,
                                            listdir)
        self._take_snapshot = lambda: self._take_safe_snapshot(
            self.watch.path, self.watch.is_recursive, stat=stat,
            listdir=listdir)
        self.logger = logger
        self.folder_was_missing = None

def _take_safe_snapshot(self, path, is_recursive, stat, listdir):
    try:
        snapshot = DirectorySnapshot(path, is_recursive,
                                     stat=stat,
                                     listdir=listdir)
        if self.folder_was_missing:
            self.logger.info("Observer successfully found the missing "
                             "directory {}.".format(path))
        self.folder_was_missing = False
        return snapshot
    except FileNotFoundError as fe:
        if not self.folder_was_missing:
            self.folder_was_missing = True
            self.logger.info("Observer can't find the directory {}."
                             "".format(path))
            self.logger.exception(fe)
            if self.logger.level == logging.DEBUG:
                traceback.print_exc()

    def queue_events(self, timeout):

        # We don't want to hit the disk continuously.
        # timeout behaves like an interval for polling emitters.
        if self.stopped_event.wait(timeout):
            return

        with self._lock:
            if not self.should_keep_running():
                return

            # Get event diff between fresh snapshot and previous snapshot.
            # Update snapshot.
            new_snapshot = self._take_snapshot()
            if new_snapshot is None:
                return
            events = DirectorySnapshotDiff(self._snapshot, new_snapshot)
            self._snapshot = new_snapshot

            # Files.
            for src_path in events.files_deleted:
                self.queue_event(FileDeletedEvent(src_path))
            for src_path in events.files_modified:
                self.queue_event(FileModifiedEvent(src_path))
            for src_path in events.files_created:
                self.queue_event(FileCreatedEvent(src_path))
            for src_path, dest_path in events.files_moved:
                self.queue_event(FileMovedEvent(src_path, dest_path))

            # Directories.
            for src_path in events.dirs_deleted:
                self.queue_event(DirDeletedEvent(src_path))
            for src_path in events.dirs_modified:
                self.queue_event(DirModifiedEvent(src_path))
            for src_path in events.dirs_created:
                self.queue_event(DirCreatedEvent(src_path))
            for src_path, dest_path in events.dirs_moved:
                self.queue_event(DirMovedEvent(src_path, dest_path))

观察者代码:

class TagIDObserver(BaseObserver):
    def __init__(self, logger, timeout=DEFAULT_OBSERVER_TIMEOUT):
        super(TagIDObserver, self).__init__(emitter_class=TagIDEmitter,
                                        timeout=timeout)
        self.logger = logger

    def schedule(self, event_handler, path, recursive=False):
        """
        Schedules watching a path and calls appropriate methods specified
        in the given event handler in response to file system events.

        :param event_handler:
            An event handler instance that has appropriate event handling
            methods which will be called by the observer in response to
            file system events.
        :type event_handler:
            :class:`watchdog.events.FileSystemEventHandler` or a subclass
        :param path:
            Directory path that will be monitored.
        :type path:
            ``str``
        :param recursive:
            ``True`` if events will be emitted for sub-directories
            traversed recursively; ``False`` otherwise.
        :type recursive:
            ``bool``
        :return:
            An :class:`ObservedWatch` object instance representing
            a watch.
        """
        with self._lock:
            watch = ObservedWatch(path, recursive)
            self._add_handler_for_watch(event_handler, watch)

        # If we don't have an emitter for this watch already, create it.
        if self._emitter_for_watch.get(watch) is None:
            emitter = self._emitter_class(event_queue=self.event_queue,
                                          watch=watch,
                                          timeout=self.timeout,
                                          logger=self.logger)
            self._add_emitter(emitter)
            if self.is_alive():
                emitter.start()
            self._watches.add(watch)
        return watch

此外,如您所见,如果您不需要日志记录,您可以将原始时间表保留在 Observer 中并仅覆盖 __ init __

于 2018-05-28T16:58:02.033 回答