我正在编写一个应用程序,它必须能够监视文件系统目录以了解 python 中的更改,为此我使用 MacFSEvents,这是 FSEvents C API 上的 python 包装器,出于明显的效率原因,轮询不能成为一种选择由于此代码可以负责递归地监视大量文件,因此该模块确实可以工作,并且这样的代码会产生预期的结果
from fsevents import Observer, Stream
def callback(FSEvent, mask=0, cookie=0):
"""
the callback that gets called when an event occurs
:param FSEvent: file or dir that originated the event
:param mask: a bitMask that specifies the type of the event
:param cookie: contains info to bind a moved_to to a moved_from event
"""
print(FSEvent)
print('\n')
print(mask)
print('\n')
if __name__ == "__main__":
try:
obs = Observer()
obs.start()
str = Stream(callback, "/Users/filipposavi/ass", file_events=True)
obs.schedule(str)
except KeyboardInterrupt:
obs.stop()
obs.join
但是,当我将它放入应该管理(相对)低级文件系统详细信息的类中时,不会调用回调,在下面您可以找到不起作用的代码
import hashlib
import os
import fsevents
from database import Db
BUFSIZE = 2 ** 3
chunksize = 1024
IN_CREATE = 0x00000100
IN_ATTRIB = 0x00000004
IN_MODIFY = 0x00000002
IN_DELETE = 0x00000200
IN_MOVED_FROM = 0x00000040
IN_MOVED_TO = 0x00000080
class FsCore(object):
def __init__(self):
self.db = Db()
def callback(self, FSEvent, mask, cookie=0):
"""
the callback that gets called when an event occurs
:param FSEvent: file or dir that originated the event
:param mask: a bitMask that specifies the type of the event
:param cookie: contains info to bind a moved_to to a moved_from event
"""
print('ciao')
if (mask & IN_CREATE) == IN_CREATE:
self.on_created(FSEvent)
if (mask & IN_ATTRIB) == IN_ATTRIB:
self.on_attrib(FSEvent)
if (mask & IN_DELETE) == IN_DELETE:
self.on_deleted(FSEvent)
if (mask & IN_MODIFY) == IN_MODIFY:
self.on_modified(FSEvent)
if (mask & IN_MOVED_FROM) == IN_MOVED_FROM:
self.on_moved(FSEvent, cookie, False)
if (mask & IN_MOVED_TO) == IN_MOVED_TO:
def startFSEvents(self, path):
"""
:param path:
"""
self.observer = fsevents.Observer()
self.observer.start()
self.stream = fsevents.Stream(self.callback, path, file_events=True)
self.observer.schedule(self.stream)
def stopFSEvents(self):
"""
"""
self.observer.unschedule(self.stream)
self.observer.join()
self.observer.stop()
这是应该触发事件/回调的代码(单元测试的一部分)
self.fs = FsCore()
self.fs.startFSEvents(self.path)
self.database = Db()
path = self.path + "/tmp"
file = open(path, mode='a')
file.write("ciao")
file.flush()
file.close()
if not self.database.getFile(path):
self.fail("onCreated callback doesn't works or it's not called at all")
希望你们能提示我正确的方向,因为我对python有点新手,所以我很坚持