0

我正在使用 Watchdog 来观察 .hdf5 文件何时填充到文件夹中。当它们被填充时,如果文件完成写入,文件就会被测试。完成写入后,文件被返回并使用。问题是当我运行 main.py 程序时,我出错了。首先是watchdog_file.py代码:

import time
import traceback

import h5py
import queue
from typing import Union

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent


class NewFileHandler(FileSystemEventHandler):
    """h5 file creation handler for Watchdog"""

    def __init__(self):
        self.file_queue = queue.Queue()

    # callback for File/Directory created event, called by Observer.
    def on_created(self, event: Union[DirCreatedEvent, FileCreatedEvent]):
        if event.src_path[-3:] == ".hdf5":
            # run callback with path string
            self.file_queue.put(event.src_path)


class ObserverWrapper:
    """Encapsulated Observer boilerplate"""

    def __init__(self, path: str, recursive=True):
        self.path = path
        self.recursive = recursive

        self.observer = Observer()
        self.handler = NewFileHandler()

        self.observer.schedule(self.handler, path=path, recursive=recursive)

        self.start()

    def start(self):
        """
        Starts observing for filesystem events. Runs self.routine() every 1 second.

        :param blocking: If true, blocks main thread until keyboard interrupt.
        """

        self.observer.start()
    def stop(self):
        """
        Stops the observer. When running self.start(blocking=True) then you don't need to call this.
        """

        self.observer.stop()
        self.observer.join()

    def wait_for_file(self):
        """
        Wait and Process newly created files
        """

        max_retry_count = 30 # will try h5 file for a max of35 seconds (upper bound) to see if the file is finished.
        # Files are usually finished within 20-30 seconds
        retry_interval_seconds = 1 # every hundreth it will try the file to see if it finished writing

        # wait for file to be added
        file_path = self.handler.file_queue.get(block=True)
        # try to open the file
        retry_count = 0
        try:
            file = h5py.File(file_path, "r")
        except OSError:
            if retry_count < max_retry_count:
                retry_count += 1
                print(f"h5 file <{file_path}> is locked, retrying {retry_count}/{max_retry_count}")
                time.sleep(retry_interval_seconds)
            else:
                print(f"h5 file <{file_path}> reached max retry count, skipping")
                return None
        except Exception as err:
            print(f"Got unexpected Error <{type(err).__name__}> while opening <{file_path}> ")
            traceback.print_exc()
        else:
            file.close()

            return file_path

这是调用它的地方main.py

self.listener = watchdog_search.ObserverWrapper("/home/path/to/folder")
self.listener.start()
self.on_finished_run(self.listener.wait_for_file()) #this line uses the finished .hdf5 file. It's not important to the error at hand. 

我得到的错误如下:

Exception in thread Thread-4:
Traceback (most recent call last):
  File "/home/anaconda3/envs/img/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/home/anaconda3/envs/img/lib/python3.8/site-packages/watchdog/observers/inotify_buffer.py", line 88, in run
    inotify_events = self._inotify.read_events()
  File "/home/anaconda3/envs/img/lib/python3.8/site-packages/watchdog/observers/inotify_c.py", line 285, in read_events
    event_buffer = os.read(self._inotify_fd, event_buffer_size)
OSError: [Errno 9] Bad file descriptor
4

1 回答 1

0

我通过重写代码解决了这个问题:

class NewFileHandler(FileSystemEventHandler):

    def __init__(self, q, *a, **k):
        super().__init__(*a, **k)
        self._q = q

    def on_created(self, event):
        self._q.put(event)

class Worker(QObject):

    new_file = pyqtSignal(str,str)

    def __init__(self, path):
        super().__init__()
        self._q = queue.Queue()
        observer = Observer()
        handler = NewFileHandler(self._q)
        observer.schedule(handler, path=path, recursive=True)
        # starts a background thread! Thus we need to wait for the
        # queue to receive the events in work.
        observer.start()

    def work(self):
        while True:
            event = self._q.get()
            max_retry_count = 350  # for test purposes now but want to set an upper bound on verifying a file is finished.
            retry_interval_seconds = .01  # every hundreth it will try the file to see if it finished writing
            retry_count = 0
            if event.event_type == "created" and event.src_path.lower().endswith(".hdf5"):
                while True:
                    try:
                        file = h5py.File(event.src_path, "r")
                        file.close()
                    except OSError:
                        if retry_count < max_retry_count:
                            retry_count += 1
                            print(f"h5 file <{event.src_path}> is locked, retrying {retry_count}/{max_retry_count}")
                            time.sleep(retry_interval_seconds)
                        else:
                            print(f"h5 file <{event.src_path}> reached max retry count, skipping")
                            break  # <--- looks useful here
                    except Exception as err:
                        print(f"Got unexpected Error <{type(err).__name__}> while opening <{event.src_path}> ")
                        traceback.print_exc()
                    else:
                        self.new_file.emit(event.src_path, os.path.basename(event.src_path))
                        break
于 2021-11-03T22:03:32.937 回答