0

假设我有这样的代码

import time  
from watchdog.observers import Observer  
from watchdog.events import PatternMatchingEventHandler

class MyHandler(PatternMatchingEventHandler):
    patterns = ["*/file.txt"]

    def process(self, event):
        """
        event.event_type 
            'modified' | 'created' | 'moved' | 'deleted'
        event.is_directory
            True | False
        event.src_path
            path/to/observed/file
        """
        # the file will be processed there
        print event.src_path, event.event_type  # print now only for degug
        if event.event_type == 'modified':
            print fname


    def on_modified(self, event):
        self.process(event)

if __name__ == '__main__':
#    args = sys.argv[1:]
    fname = 'file.txt'
    observer = Observer()
    observer.schedule(MyHandler(), path='.')
    observer.start()

    try:
        while True:
            time.sleep(60)
    except KeyboardInterrupt:
        observer.stop()

    observer.join()

我的目标是让用户patterns在 MyHandler 类的变量中输入 fname。

import time  
from watchdog.observers import Observer  
from watchdog.events import PatternMatchingEventHandler

class MyHandler(PatternMatchingEventHandler):
    patterns = ["*/"+fname]

    def process(self, event):
        """
        event.event_type 
            'modified' | 'created' | 'moved' | 'deleted'
        event.is_directory
            True | False
        event.src_path
            path/to/observed/file
        """
        # the file will be processed there
        print event.src_path, event.event_type  # print now only for degug
        if event.event_type == 'modified':
            print fname


    def on_modified(self, event):
        self.process(event)

if __name__ == '__main__':
#    args = sys.argv[1:]
    fname = 'file.txt'
    observer = Observer()
    observer.schedule(MyHandler(), path='.')
    observer.start()

    try:
        while True:
            time.sleep(60)
    except KeyboardInterrupt:
        observer.stop()

    observer.join()

我尝试了上面提到的一些方法,但没有奏效。它说fname is not defined。好吧,我也试过patterns = [].append("*/"+fname)了,也没有成功。

然后我尝试创建一个初始化函数

def __init__(fname):
    self.patterns = [].append("*/"+fname)

并像过去一样observer.schedule(MyHandler(fname), path='.')

无赖!也没有成功。我怎样才能做到这一点?

4

1 回答 1

0

您可以使用fnmatch.fnmatch将文件路径与 shell 模式匹配:

In [72]: import fnmatch
In [77]: fnmatch.fnmatch('/root/path/foo', '*/foo')
Out[77]: True

import time
import fnmatch
from watchdog.observers import Observer  
from watchdog.events import PatternMatchingEventHandler

class MyHandler(PatternMatchingEventHandler):

    def init(self, patterns):
        self.patterns = ["*/"+pat for pat in patterns]

    def process(self, event):
        """
        event.event_type 
            'modified' | 'created' | 'moved' | 'deleted'
        event.is_directory
            True | False
        event.src_path
            path/to/observed/file
        """
        path = event.src_path
        print event.src_path, event.event_type  # print now only for degug        
        if any(fnmatch.fnmatch(path, pat) for pat in self.patterns):
            # the file will be processed there
            if event.event_type == 'modified':
                print(fname)

    def on_modified(self, event):
        self.process(event)

if __name__ == '__main__':
    args = sys.argv[1:]
    observer = Observer()
    observer.schedule(MyHandler(args), path='.')
    observer.start()

    try:
        while True:
            time.sleep(60)
    except KeyboardInterrupt:
        observer.stop()

    observer.join()
于 2019-02-20T22:26:44.130 回答