371

我有一个由另一个进程写入的日志文件,我想观察它的变化。每次发生更改时,我都想读取新数据以对其进行一些处理。

最好的方法是什么?我希望 PyWin32 库中有某种钩子。我找到了该win32file.FindNextChangeNotification功能,但不知道如何要求它观看特定文件。

如果有人做过这样的事情,我将非常感激听到如何...

[编辑]我应该提到我正在寻求不需要轮询的解决方案。

[编辑]诅咒!似乎这不适用于映射的网络驱动器。我猜windows不会像在本地磁盘上那样“听到”文件的任何更新。

4

28 回答 28

311

你试过看门狗吗?

用于监视文件系统事件的 Python API 库和 shell 实用程序。

目录监控变得容易

  • 一个跨平台的 API。
  • 用于响应目录更改运行命令的 shell 工具。

通过快速入门中的一个简单示例快速入门...

于 2011-01-14T11:52:26.400 回答
114

如果轮询对您来说足够好,我会观察“修改时间”文件统计信息是否发生变化。要阅读它:

os.stat(filename).st_mtime

(另请注意,Windows 本机更改事件解决方案并非在所有情况下都有效,例如在网络驱动器上。)

import os

class Monkey(object):
    def __init__(self):
        self._cached_stamp = 0
        self.filename = '/path/to/file'

    def ook(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...
于 2008-10-08T11:34:14.850 回答
81

您是否已经查看过http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html上提供的文档?如果您只需要它在 Windows 下工作,那么第二个示例似乎正是您想要的(如果您将目录的路径与您想要观看的文件之一交换)。

否则,轮询可能是唯一真正独立于平台的选项。

注意:我没有尝试过任何这些解决方案。

于 2008-10-08T11:29:43.330 回答
53

如果您想要一个多平台解决方案,请检查QFileSystemWatcher。这是一个示例代码(未清理):

from PyQt4 import QtCore

@QtCore.pyqtSlot(str)
def directory_changed(path):
    print('Directory Changed!!!')

@QtCore.pyqtSlot(str)
def file_changed(path):
    print('File Changed!!!')

fs_watcher = QtCore.QFileSystemWatcher(['/path/to/files_1', '/path/to/files_2', '/path/to/files_3'])

fs_watcher.connect(fs_watcher, QtCore.SIGNAL('directoryChanged(QString)'), directory_changed)
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('fileChanged(QString)'), file_changed)
于 2011-03-17T13:45:31.360 回答
32

它不应该在 Windows 上工作(也许用 cygwin ?),但对于 unix 用户,你应该使用“fcntl”系统调用。这是 Python 中的一个示例。如果您需要用 C 编写它,它几乎是相同的代码(相同的函数名称)

import time
import fcntl
import os
import signal

FNAME = "/HOME/TOTO/FILETOWATCH"

def handler(signum, frame):
    print "File %s modified" % (FNAME,)

signal.signal(signal.SIGIO, handler)
fd = os.open(FNAME,  os.O_RDONLY)
fcntl.fcntl(fd, fcntl.F_SETSIG, 0)
fcntl.fcntl(fd, fcntl.F_NOTIFY,
            fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_MULTISHOT)

while True:
    time.sleep(10000)
于 2009-01-23T16:08:40.207 回答
20

查看pyinotify

inotify 在较新的 linux 中替换 dnotify (来自较早的答案),并允许文件级而不是目录级监视。

于 2010-06-13T05:12:49.907 回答
13

好吧,在对 Tim Golden 的脚本进行了一些修改之后,我得到了以下似乎运行良好的内容:

import os

import win32file
import win32con

path_to_watch = "." # look at the current directory
file_to_watch = "test.txt" # look for changes to a file called test.txt

def ProcessNewData( newData ):
    print "Text added: %s"%newData

# Set up the bits we'll need for output
ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
hDir = win32file.CreateFile (
  path_to_watch,
  FILE_LIST_DIRECTORY,
  win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
  None,
  win32con.OPEN_EXISTING,
  win32con.FILE_FLAG_BACKUP_SEMANTICS,
  None
)

# Open the file we're interested in
a = open(file_to_watch, "r")

# Throw away any exising log data
a.read()

# Wait for new data and call ProcessNewData for each new chunk that's written
while 1:
  # Wait for a change to occur
  results = win32file.ReadDirectoryChangesW (
    hDir,
    1024,
    False,
    win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
    None,
    None
  )

  # For each change, check to see if it's updating the file we're interested in
  for action, file in results:
    full_filename = os.path.join (path_to_watch, file)
    #print file, ACTIONS.get (action, "Unknown")
    if file == file_to_watch:
        newText = a.read()
        if newText != "":
            ProcessNewData( newText )

它可能与负载更多的错误检查有关,但对于简单地查看日志文件并在将其吐出到屏幕之前对其进行一些处理,这很有效。

感谢大家的投入 - 很棒的东西!

于 2008-10-08T14:05:53.203 回答
12

为了查看带有轮询和最小依赖项的单个文件,这里有一个完整的示例,基于Deestan的回答(上图):

import os
import sys 
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_file, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self.filename = watch_file
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...
            print('File changed')
            if self.call_func_on_change is not None:
                self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop        
    def watch(self):
        while self.running: 
            try: 
                # Look for changes
                time.sleep(self.refresh_delay_secs) 
                self.look() 
            except KeyboardInterrupt: 
                print('\nDone') 
                break 
            except FileNotFoundError:
                # Action on file not found
                pass
            except: 
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)

watch_file = 'my_file.txt'

# watcher = Watcher(watch_file)  # simple
watcher = Watcher(watch_file, custom_action, text='yes, changed')  # also call custom action function
watcher.watch()  # start the watch going
于 2018-02-27T11:30:25.837 回答
9

检查类似问题的回答。您可以在 Python 中尝试相同的循环。该页面建议:

import time

while 1:
    where = file.tell()
    line = file.readline()
    if not line:
        time.sleep(1)
        file.seek(where)
    else:
        print line, # already has newline

另请参阅问题tail() a file with Python

于 2008-10-08T11:28:12.243 回答
8

这是 Kender 代码的简化版本,它似乎具有相同的技巧,并且不导入整个文件:

# Check file for new data.

import time

f = open(r'c:\temp\test.txt', 'r')

while True:

    line = f.readline()
    if not line:
        time.sleep(1)
        print 'Nothing New'
    else:
        print 'Call Function: ', line
于 2009-12-08T16:09:42.800 回答
8

这是 Tim Goldan 脚本的另一个修改,它在 unix 类型上运行,并通过使用 dict (file=>time) 添加了一个简单的文件修改观察程序。

用法:whateverName.py path_to_dir_to_watch

#!/usr/bin/env python

import os, sys, time

def files_to_timestamp(path):
    files = [os.path.join(path, f) for f in os.listdir(path)]
    return dict ([(f, os.path.getmtime(f)) for f in files])

if __name__ == "__main__":

    path_to_watch = sys.argv[1]
    print('Watching {}..'.format(path_to_watch))

    before = files_to_timestamp(path_to_watch)

    while 1:
        time.sleep (2)
        after = files_to_timestamp(path_to_watch)

        added = [f for f in after.keys() if not f in before.keys()]
        removed = [f for f in before.keys() if not f in after.keys()]
        modified = []

        for f in before.keys():
            if not f in removed:
                if os.path.getmtime(f) != before.get(f):
                    modified.append(f)

        if added: print('Added: {}'.format(', '.join(added)))
        if removed: print('Removed: {}'.format(', '.join(removed)))
        if modified: print('Modified: {}'.format(', '.join(modified)))

        before = after
于 2013-02-25T16:05:07.080 回答
7

好吧,由于您使用的是 Python,您可以打开一个文件并继续读取其中的行。

f = open('file.log')

如果读取的行不为空,则处理它。

line = f.readline()
if line:
    // Do what you want with the line

您可能错过了继续致电readlineEOF 是可以的。在这种情况下,它只会继续返回一个空字符串。并且当某些内容附加到日志文件时,读取将根据您的需要从停止的位置继续。

如果您正在寻找使用事件或特定库的解决方案,请在您的问题中指定。否则,我认为这个解决方案很好。

于 2008-10-08T12:18:25.980 回答
7

对我来说最简单的解决方案是使用看门狗的工具 watchmedo

https://pypi.python.org/pypi/watchdog我现在有一个进程可以在目录中查找 sql 文件并在必要时执行它们。

watchmedo shell-command \
--patterns="*.sql" \
--recursive \
--command='~/Desktop/load_files_into_mysql_database.sh' \
.
于 2014-06-25T13:43:37.950 回答
4

正如您在Tim Golden 的文章中所看到的,由Horst Gutmann指出,WIN32 相对复杂并且监视目录,而不是单个文件。

我建议您研究IronPython,这是一个.NET python 实现。使用 IronPython,您可以使用所有.NET功能 - 包括

System.IO.FileSystemWatcher

它使用简单的事件接口处理单个文件。

于 2008-10-08T11:42:12.247 回答
2
ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001

class myThread (threading.Thread):
    def __init__(self, threadID, fileName, directory, origin):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.fileName = fileName
        self.daemon = True
        self.dir = directory
        self.originalFile = origin
    def run(self):
        startMonitor(self.fileName, self.dir, self.originalFile)

def startMonitor(fileMonitoring,dirPath,originalFile):
    hDir = win32file.CreateFile (
        dirPath,
        FILE_LIST_DIRECTORY,
        win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
        None,
        win32con.OPEN_EXISTING,
        win32con.FILE_FLAG_BACKUP_SEMANTICS,
        None
    )
    # Wait for new data and call ProcessNewData for each new chunk that's
    # written
    while 1:
        # Wait for a change to occur
        results = win32file.ReadDirectoryChangesW (
            hDir,
            1024,
            False,
            win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
            None,
            None
        )
        # For each change, check to see if it's updating the file we're
        # interested in
        for action, file_M in results:
            full_filename = os.path.join (dirPath, file_M)
            #print file, ACTIONS.get (action, "Unknown")
            if len(full_filename) == len(fileMonitoring) and action == 3:
                #copy to main file
                ...
于 2013-09-22T18:40:48.793 回答
2

这是检查文件更改的示例。一种可能不是最好的方法,但它肯定是一种捷径。

对源进行更改时重新启动应用程序的便捷工具。我在玩 pygame 时做了这个,所以我可以在文件保存后立即看到效果。

在 pygame 中使用时,请确保将“while”循环中的内容放置在您的游戏循环中,即更新或其他任何内容中。否则,您的应用程序将陷入无限循环,您将看不到游戏更新。

file_size_stored = os.stat('neuron.py').st_size

  while True:
    try:
      file_size_current = os.stat('neuron.py').st_size
      if file_size_stored != file_size_current:
        restart_program()
    except: 
      pass

如果您想要我在网上找到的重启代码。这里是。(与问题无关,尽管它可能会派上用场)

def restart_program(): #restart application
    python = sys.executable
    os.execl(python, python, * sys.argv)

让电子做你想让他们做的事,玩得开心。

于 2014-04-20T11:11:19.840 回答
2

似乎没有人发布fswatch。它是一个跨平台的文件系统观察者。只需安装它,运行它并按照提示进行操作。

我已经将它与 python 和 golang 程序一起使用,并且它可以正常工作。

于 2019-03-11T13:48:17.937 回答
2

由于我已在全球范围内安装它,因此我最喜欢的方法是使用 nodemon。如果您的源代码在 中src,并且您的入口点是src/app.py,那么它很简单:

nodemon -w 'src/**' -e py,html --exec python src/app.py

... 在哪里-e py,html可以控制要监视更改的文件类型。

于 2020-09-29T16:45:01.360 回答
1

这是一个示例,旨在查看每秒写入不超过一行但通常要少得多的输入文件。目标是将最后一行(最近的写入)附加到指定的输出文件。我从我的一个项目中复制了这个,只是删除了所有不相关的行。您必须填写或更改缺少的符号。

from PyQt5.QtCore import QFileSystemWatcher, QSettings, QThread
from ui_main_window import Ui_MainWindow   # Qt Creator gen'd 

class MainWindow(QMainWindow, Ui_MainWindow):
    def __init__(self, parent=None):
        QMainWindow.__init__(self, parent)
        Ui_MainWindow.__init__(self)
        self._fileWatcher = QFileSystemWatcher()
        self._fileWatcher.fileChanged.connect(self.fileChanged)

    def fileChanged(self, filepath):
        QThread.msleep(300)    # Reqd on some machines, give chance for write to complete
        # ^^ About to test this, may need more sophisticated solution
        with open(filepath) as file:
            lastLine = list(file)[-1]
        destPath = self._filemap[filepath]['dest file']
        with open(destPath, 'a') as out_file:               # a= append
            out_file.writelines([lastLine])

当然,包含 QMainWindow 类并不是严格要求的,即。您可以单独使用 QFileSystemWatcher。

于 2016-09-21T01:53:44.947 回答
0

最好和最简单的解决方案是使用 pygtail: https ://pypi.python.org/pypi/pygtail

from pygtail import Pygtail
import sys

while True:
    for line in Pygtail("some.log"):
        sys.stdout.write(line)
于 2018-01-31T01:06:58.380 回答
0

您还可以使用一个名为repyt的简单库,这是一个示例:

repyt ./app.py
于 2018-11-19T13:41:32.373 回答
0

相关@4Oh4 解决方案平滑更改要观看的文件列表;

import os
import sys
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_files, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self._cached_stamp_files = {}
        self.filenames = watch_files
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        for file in self.filenames:
            stamp = os.stat(file).st_mtime
            if not file in self._cached_stamp_files:
                self._cached_stamp_files[file] = 0
            if stamp != self._cached_stamp_files[file]:
                self._cached_stamp_files[file] = stamp
                # File has changed, so do something...
                file_to_read = open(file, 'r')
                value = file_to_read.read()
                print("value from file", value)
                file_to_read.seek(0)
                if self.call_func_on_change is not None:
                    self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop
    def watch(self):
        while self.running:
            try:
                # Look for changes
                time.sleep(self.refresh_delay_secs)
                self.look()
            except KeyboardInterrupt:
                print('\nDone')
                break
            except FileNotFoundError:
                # Action on file not found
                pass
            except Exception as e:
                print(e)
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)
    # pass

watch_files = ['/Users/mexekanez/my_file.txt', '/Users/mexekanez/my_file1.txt']

# watcher = Watcher(watch_file)  # simple



if __name__ == "__main__":
    watcher = Watcher(watch_files, custom_action, text='yes, changed')  # also call custom action function
    watcher.watch()  # start the watch going
于 2019-06-27T09:40:21.527 回答
0
import inotify.adapters
from datetime import datetime


LOG_FILE='/var/log/mysql/server_audit.log'


def main():
    start_time = datetime.now()
    while True:
        i = inotify.adapters.Inotify()
        i.add_watch(LOG_FILE)
        for event in i.event_gen(yield_nones=False):
            break
        del i

        with open(LOG_FILE, 'r') as f:
            for line in f:
                entry = line.split(',')
                entry_time = datetime.strptime(entry[0],
                                               '%Y%m%d %H:%M:%S')
                if entry_time > start_time:
                    start_time = entry_time
                    print(entry)


if __name__ == '__main__':
    main()
于 2021-05-29T20:49:46.350 回答
0

最简单的解决方案是在一段时间后获取同一文件的两个实例并比较它们。你可以试试这样的

    while True:
        # Capturing the two instances models.py after certain interval of time
        print("Looking for changes in " + app_name.capitalize() + " models.py\nPress 'CTRL + C' to stop the program")
        with open(app_name.capitalize() + '/filename', 'r+') as app_models_file:
            filename_content = app_models_file.read()
        time.sleep(5)
        with open(app_name.capitalize() + '/filename', 'r+') as app_models_file_1:
            filename_content_1 = app_models_file_1.read()
        # Comparing models.py after certain interval of time
        if filename_content == filename_content_1:
            pass
        else:
            print("You made a change in " + app_name.capitalize() + " filename.\n")
            cmd = str(input("Do something with the file?(y/n):"))
            if cmd == 'y':
                # Do Something
            elif cmd == 'n':
                # pass or do something
            else:
                print("Invalid input")
于 2022-02-09T16:22:31.497 回答
0

我的理解是您想不断检查是否在特定文件上写入了某种数据,这是一个解决方案:

import os
filename="myfile.txt" # Add full path if file is located in another directory
initial_filesize=os.path.getsize(filename) # Getting size of the file for comparition 
while 1:
    final_filesize=os.path.getsize(filename)
    if final_filsize<intial_filesize or final_filesize>initial_filesize:
        print("change in file")
        break
        # You execute your code here
    else:
        pass

上面的代码工作如下:

  1. 首先我们使用了OS module为了与系统或操作系统进行交互。
  2. 在进入循环之前,我们已经检索了文件的原始文件大小并将其命名为initial_filesize.
  3. 现在我们使用While loop,在其中我们不断检索文件的文件大小,如果进行了更改,它将进入if statement.

So basically, we have just used the file size to check if changes are made in the file, pretty simple and this method can work for almost every file type.

于 2022-02-10T10:28:18.823 回答
-2

如果您使用的是 Windows,请创建此 POLL.CMD 文件

@echo off
:top
xcopy /m /y %1 %2 | find /v "File(s) copied"
timeout /T 1 > nul
goto :top

然后您可以键入“poll dir1 dir2”,它会将所有文件从 dir1 复制到 dir2 并每秒检查一次更新。

“查找”是可选的,只是为了减少控制台的噪音。

这不是递归的。也许您可以在 xcopy 上使用 /e 使其递归。

于 2020-08-17T15:23:44.563 回答
-4

我不知道任何 Windows 特定功能。您可以尝试每秒/分钟/小时获取文件的 MD5 哈希(取决于您需要多快)并将其与最后一个哈希进行比较。当它不同时,您知道文件已更改并且您读出了最新的行。

于 2008-10-08T11:25:57.270 回答
-6

我会尝试这样的事情。

    try:
            f = open(filePath)
    except IOError:
            print "No such file: %s" % filePath
            raw_input("Press Enter to close window")
    try:
            lines = f.readlines()
            while True:
                    line = f.readline()
                    try:
                            if not line:
                                    time.sleep(1)
                            else:
                                    functionThatAnalisesTheLine(line)
                    except Exception, e:
                            # handle the exception somehow (for example, log the trace) and raise the same exception again
                            raw_input("Press Enter to close window")
                            raise e
    finally:
            f.close()

循环检查自上次读取文件以来是否有新行 - 如果有,则将其读取并传递给functionThatAnalisesTheLine函数。如果没有,脚本将等待 1 秒并重试该过程。

于 2008-10-08T11:26:44.960 回答