观察增长文件尾部是否出现某些关键字的pythonic方法是什么?
在 shell 中,我可能会说:
tail -f "$file" | grep "$string" | while read hit; do
#stuff
done
好吧,最简单的方法是不断地从文件中读取,检查新内容并测试命中。
import time
def watch(fn, words):
fp = open(fn, 'r')
while True:
new = fp.readline()
# Once all lines are read this just returns ''
# until the file changes and a new line appears
if new:
for word in words:
if word in new:
yield (word, new)
else:
time.sleep(0.5)
fn = 'test.py'
words = ['word']
for hit_word, hit_sentence in watch(fn, words):
print "Found %r in line: %r" % (hit_word, hit_sentence)
readline
如果您知道您的数据将成行显示,则此解决方案适用。
如果数据是某种流,则需要一个缓冲区,大于word
您要查找的最大缓冲区,然后先填充它。这样就有点复杂了……
def tail(f):
f.seek(0, 2)
while True:
line = f.readline()
if not line:
time.sleep(0.1)
continue
yield line
def process_matches(matchtext):
while True:
line = (yield)
if matchtext in line:
do_something_useful() # email alert, etc.
list_of_matches = ['ERROR', 'CRITICAL']
matches = [process_matches(string_match) for string_match in list_of_matches]
for m in matches: # prime matches
m.next()
while True:
auditlog = tail( open(log_file_to_monitor) )
for line in auditlog:
for m in matches:
m.send(line)
我用它来监视日志文件。在完整的实现中,我将 list_of_matches 保存在一个配置文件中,以便它可以用于多种用途。在我的增强列表中,支持正则表达式而不是简单的“in”匹配。
编辑:正如下面的评论所指出的,O_NONBLOCK
不适用于磁盘上的文件。如果其他人来寻找来自套接字或命名管道或其他进程的尾部数据,这仍然会有所帮助,但它不能回答所提出的实际问题。原始答案保留在下面以供后代使用。(调用 tail 和 grep 会起作用,但无论如何都不是答案。)
要么打开文件O_NONBLOCK
并使用它select
来轮询读取可用性,然后read
读取新数据和字符串方法以过滤文件末尾的行......或者只是使用subprocess
模块并让tail
你grep
完成工作你会在壳里。
您可以使用 select 来轮询文件中的新内容。
def tail(filename, bufsize = 1024):
fds = [ os.open(filename, os.O_RDONLY) ]
while True:
reads, _, _ = select.select(fds, [], [])
if 0 < len(reads):
yield os.read(reads[0], bufsize)
你可以使用pytailf:简单的 python tail -f wrapper
from tailf import tailf
for line in tailf("myfile.log"):
print line
如果您不能将问题限制为基于行的读取,则需要使用块。
这应该有效:
import sys
needle = "needle"
blocks = []
inf = sys.stdin
if len(sys.argv) == 2:
inf = open(sys.argv[1])
while True:
block = inf.read()
blocks.append(block)
if len(blocks) >= 2:
data = "".join((blocks[-2], blocks[-1]))
else:
data = blocks[-1]
# attention, this needs to be changed if you are interested
# in *all* matches separately, not if there was any match ata all
if needle in data:
print "found"
blocks = []
blocks[:-2] = []
if block == "":
break
挑战在于确保您匹配针,即使它被两个块边界分开。
据我所知,Python 函数列表中没有“tail”等价物。解决方案是使用 tell() (获取文件大小)和 read() 来计算结尾行。
这篇博文(不是我写的)有写出来的功能,看起来很适合我! http://www.manugarg.com/2007/04/real-tailing-in-python.html
如果您只需要一个简单的 Python 3 解决方案来处理编写的文本文件的行,并且不需要 Windows 支持,那么这对我来说效果很好:
import subprocess
def tailf(filename):
#returns lines from a file, starting from the beginning
command = "tail -n +1 -F " + filename
p = subprocess.Popen(command.split(), stdout=subprocess.PIPE, universal_newlines=True)
for line in p.stdout:
yield line
for line in tailf("logfile"):
#do stuff
它会阻塞等待新行的写入,因此不适合异步使用而无需进行一些修改。
你可以collections.deque
用来实现tail。
来自http://docs.python.org/library/collections.html#deque-recipes ...
def tail(filename, n=10):
'Return the last n lines of a file'
return deque(open(filename), n)
当然,这会读取整个文件内容,但它是一种简洁而简洁的方式来实现 tail。