1

使用 python watchdog文件系统事件监视库,我注意到在 Windows Server 2003 下使用时,它进入“轮询模式”,因此停止使用异步操作系统通知,因此在大量文件更改下严重降低系统性能。

当用户想要停止监视监视的目录或文件时,我将问题追溯到使用系统调用以停止调用锁定watchdog/observers/winapi.py的文件CancelIoExReadDirectoryChangesW

(winapi.py)

CancelIoEx = ctypes.windll.kernel32.CancelIoEx
CancelIoEx.restype = ctypes.wintypes.BOOL
CancelIoEx.errcheck = _errcheck_bool
CancelIoEx.argtypes = (
    ctypes.wintypes.HANDLE,  # hObject
    ctypes.POINTER(OVERLAPPED)  # lpOverlapped
)

...
...
...

def close_directory_handle(handle):
    try:
        CancelIoEx(handle, None)  # force ReadDirectoryChangesW to return
    except WindowsError:
        return

call的问题CancelIoEx是它直到 Windows Server 2008 才可用:http: //msdn.microsoft.com/en-us/library/windows/desktop/aa363792 (v=vs.85).aspx

一种可能的替代方法是进行更改close_directory_handle以使其在受监视的目录中创建一个模拟文件,从而解锁等待ReadDirectoryChangesW返回的线程。

但是,我注意到CancelIo系统调用实际上在 Windows Server 2003 中可用:

取消调用线程为指定文件发出的所有挂起的输入和输出 (I/O) 操作。该函数不会取消其他线程为文件句柄发出的 I/O 操作。要从另一个线程取消 I/O 操作,请使用 CancelIoEx 函数。

但是调用CancelIo不会影响等待线程。

您对如何解决此问题有任何想法吗?可以threading.enumerate()使用发出一个信号,CancelIo由这些处理程序调用的每个线程处理吗?

4

1 回答 1

1

自然的方法是实现一个完成例程并调用ReadDirectoryChangesW使用它的重叠模式。以下示例显示了执行此操作的方法:

RDCW_CALLBACK_F = ctypes.WINFUNCTYPE(None, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD, ctypes.POINTER(OVERLAPPED))

首先,创建一个 WINFUNCTYPE 工厂,该工厂将用于从 python 方法生成(可从 Windows API 调用)类似 C 的函数。在这种情况下,没有返回值和3个参数对应

VOID CALLBACK FileIOCompletionRoutine(
  _In_     DWORD dwErrorCode,
  _In_     DWORD dwNumberOfBytesTransfered,
  _Inout_  LPOVERLAPPED lpOverlapped
);

FileIOCompletionRoutine标头。

回调引用以及重叠结构需要添加到ReadDirectoryChangesW参数列表中:

ReadDirectoryChangesW = ctypes.windll.kernel32.ReadDirectoryChangesW

ReadDirectoryChangesW.restype = ctypes.wintypes.BOOL
ReadDirectoryChangesW.errcheck = _errcheck_bool
ReadDirectoryChangesW.argtypes = (
    ctypes.wintypes.HANDLE,  # hDirectory
    LPVOID,  # lpBuffer
    ctypes.wintypes.DWORD,  # nBufferLength
    ctypes.wintypes.BOOL,  # bWatchSubtree
    ctypes.wintypes.DWORD,  # dwNotifyFilter
    ctypes.POINTER(ctypes.wintypes.DWORD),  # lpBytesReturned
    ctypes.POINTER(OVERLAPPED),  # lpOverlapped
    RDCW_CALLBACK_F  # FileIOCompletionRoutine # lpCompletionRoutine
)

从这里,我们准备好执行重叠的系统调用。这是一个简单的调用 bacl 只是用于测试一切正常:

def dir_change_callback(dwErrorCode,dwNumberOfBytesTransfered,p):
     print("dir_change_callback! PID:" + str(os.getpid()))
     print("CALLBACK THREAD: " + str(threading.currentThread()))

准备并执行呼叫:

event_buffer = ctypes.create_string_buffer(BUFFER_SIZE)
nbytes = ctypes.wintypes.DWORD()
overlapped_read_dir = OVERLAPPED()
call2pass = RDCW_CALLBACK_F(dir_change_callback)

hand = get_directory_handle(os.path.abspath("/test/"))

def docall():
    ReadDirectoryChangesW(hand, ctypes.byref(event_buffer),
                          len(event_buffer), False,
                          WATCHDOG_FILE_NOTIFY_FLAGS,
                          ctypes.byref(nbytes), 
                          ctypes.byref(overlapped_read_dir), call2pass)

print("Waiting!")
docall()

如果您将所有这些代码加载并执行到DreamPie交互式 shell 中,您可以检查系统调用是否完成以及回调是否执行,从而在c:\test目录下第一次更改完成后打印线程和 pid 编号。此外,您会注意到它们与主线程和进程相同:尽管事件是由单独的线程引发的,但回调在与我们的主程序相同的进程和线程中运行,因此提供了不希望的行为:

lck = threading.Lock()

def dir_change_callback(dwErrorCode,dwNumberOfBytesTransfered,p):
     print("dir_change_callback! PID:" + str(os.getpid()))
     print("CALLBACK THREAD: " + str(threading.currentThread()))

...
...
...

lck.acquire()
print("Waiting!")
docall()
lck.acquire()

该程序将锁定主线程并且回调永远不会执行。我尝试了许多同步工具,甚至 Windows API 信号量总是得到相同的行为,所以最后,我决定在使用python 库ReadDirectoryChangesW管理和同步的单独进程中使用同步配置来实现异步调用:multiprocessing

调用get_directory_handle不会返回由 Windows API 提供的句柄编号,而是由winapi库管理的句柄编号,为此我实现了一个句柄生成器:

class FakeHandleFactory():
    _hl = threading.Lock()
    _next = 0
    @staticmethod
    def next():
        FakeHandleFactory._hl.acquire()
        ret = FakeHandleFactory._next
        FakeHandleFactory._next += 1
        FakeHandleFactory._hl.release()
        return ret

每个生成的句柄都必须与文件系统路径全局关联:

handle2file = {}

现在,每次调用都read_directory_changes将生成ReadDirectoryRequest(派生自multiprocessing.Process)对象:

class ReadDirectoryRequest(multiprocessing.Process):

    def _perform_and_wait4request(self, path, recursive, event_buffer, nbytes):
        hdl = CreateFileW(path, FILE_LIST_DIRECTORY, WATCHDOG_FILE_SHARE_FLAGS,
                       None, OPEN_EXISTING, WATCHDOG_FILE_FLAGS, None)
        #print("path: " + path)
        aux_buffer = ctypes.create_string_buffer(BUFFER_SIZE)
        aux_n = ctypes.wintypes.DWORD()
        #print("_perform_and_wait4request! PID:" + str(os.getpid()))
        #print("CALLBACK THREAD: " + str(threading.currentThread()) + "\n----------")
        try:
            ReadDirectoryChangesW(hdl, ctypes.byref(aux_buffer),
                              len(event_buffer), recursive,
                              WATCHDOG_FILE_NOTIFY_FLAGS,
                              ctypes.byref(aux_n), None, None)
        except WindowsError as e:
            print("!" + str(e))
            if e.winerror == ERROR_OPERATION_ABORTED:
                nbytes = 0
                event_buffer = []
            else:
                nbytes = 0
                event_buffer = []
        # Python 2/3 compat
        nbytes.value = aux_n.value
        for i in xrange(self.int_class(aux_n.value)):
            event_buffer[i] = aux_buffer[i]
        CloseHandle(hdl)
        try:
            self.lck.release()
        except:
            pass



    def __init__(self, handle, recursive):
        buffer = ctypes.create_string_buffer(BUFFER_SIZE)
        self.event_buffer = multiprocessing.Array(ctypes.c_char, buffer)
        self.nbytes = multiprocessing.Value(ctypes.wintypes.DWORD, 0)
        targetPath = handle2file.get(handle, None)
        super(ReadDirectoryRequest, self).__init__(target=self._perform_and_wait4request, args=(targetPath, recursive, self.event_buffer, self.nbytes))
        self.daemon = True
        self.lck = multiprocessing.Lock()
        self.result = None
        try:
            self.int_class = long
        except NameError:
            self.int_class = int
        if targetPath is None:
            self.result = ([], -1)

    def CancelIo(self):
        try:
            self.result = ([], 0)
            self.lck.release()
        except:
            pass

    def read_changes(self):
        #print("read_changes! PID:" + str(os.getpid()))
        #print("CALLBACK THREAD: " + str(threading.currentThread()) + "\n----------")
        if self.result is not None:
            raise Exception("ReadDirectoryRequest object can be used only once!")
        self.lck.acquire()
        self.start()
        self.lck.acquire()
        self.result = (self.event_buffer, self.int_class(self.nbytes.value))
        return self.result

此类指定Process提供一个执行系统调用并等待直到(或)的进程:

  • 已引发更改事件。
  • 主线程通过调用ReadDirectoryRequest对象CancelIo方法取消请求。

注意:

  • 获取目录句柄
  • close_directory_handle
  • 读取目录更改

角色现在用于管理请求。为此,需要线程锁和辅助数据结构:

rqIndexLck = threading.Lock() # Protects the access to `rqIndex`
rqIndex = {} # Maps handles to request objects sets.

获取目录句柄

def get_directory_handle(path):
    rqIndexLck.acquire()
    ret = FakeHandleFactory.next()
    handle2file[ret] = path
    rqIndexLck.release()
    return ret

close_directory_handle

def close_directory_handle(handle):
    rqIndexLck.acquire()
    rqset4handle = rqIndex.get(handle, None)
    if rqset4handle is not None:
        for rq in rqset4handle:
            rq.CancelIo()
        del rqIndex[handle]
    if handle in handle2file:
        del handle2file[handle]
    rqIndexLck.release()

最后但同样重要的是:read_directory_changes

def read_directory_changes(handle, recursive):
    rqIndexLck.acquire()
    rq = ReadDirectoryRequest(handle, recursive)
    set4handle = None
    if handle in rqIndex:
        set4handle = rqIndex[handle]
    else:
        set4handle = set()
        rqIndex[handle] = set4handle
    set4handle.add(rq)
    rqIndexLck.release()
    ret = rq.read_changes()
    rqIndexLck.acquire()
    if rq in set4handle:
        set4handle.remove(rq)
    rqIndexLck.release()
    return ret
于 2014-09-15T07:47:04.713 回答