我想在 Python 中使用 tail -F 或类似的输出而不阻塞或锁定。我在这里找到了一些非常古老的代码,但我认为现在必须有更好的方法或库来做同样的事情。有人知道吗?
理想情况下,我tail.getNewData()
每次想要更多数据时都可以调用类似的东西。
如果您在 linux 上(因为 windows 不支持在文件上调用 select),您可以将 subprocess 模块与 select 模块一起使用。
import time
import subprocess
import select
f = subprocess.Popen(['tail','-F',filename],\
stdout=subprocess.PIPE,stderr=subprocess.PIPE)
p = select.poll()
p.register(f.stdout)
while True:
if p.poll(1):
print f.stdout.readline()
time.sleep(1)
这会轮询输出管道以获取新数据并在可用时打印它。通常time.sleep(1)
andprint f.stdout.readline()
将被替换为有用的代码。
您可以使用 subprocess 模块而无需额外的 select 模块调用。
import subprocess
f = subprocess.Popen(['tail','-F',filename],\
stdout=subprocess.PIPE,stderr=subprocess.PIPE)
while True:
line = f.stdout.readline()
print line
这也将在添加新行时打印它们,但它会阻塞直到尾部程序关闭,可能使用f.kill()
.
使用sh 模块(pip install sh):
from sh import tail
# runs forever
for line in tail("-f", "/var/log/some_log_file.log", _iter=True):
print(line)
[更新]
由于带有 =True 的 sh.tail_iter
是一个生成器,因此您可以:
import sh
tail = sh.tail("-f", "/var/log/some_log_file.log", _iter=True)
然后你可以“getNewData”:
new_data = tail.next()
请注意,如果尾部缓冲区为空,它将阻塞,直到有更多数据(根据您的问题,不清楚在这种情况下您要做什么)。
[更新]
如果您将 -f 替换为 -F,这将有效,但在 Python 中它将被锁定。如果可能的话,我会对拥有一个可以在需要时调用以获取新数据的函数更感兴趣。– 伊莱
容器生成器将尾调用放置在 while True 循环中并捕获最终的 I/O 异常将具有与 -F 几乎相同的效果。
def tail_F(some_file):
while True:
try:
for line in sh.tail("-f", some_file, _iter=True):
yield line
except sh.ErrorReturnCode_1:
yield None
如果文件变得不可访问,生成器将返回 None。但是,如果文件可访问,它仍然会阻塞,直到有新数据。我还不清楚在这种情况下你想做什么。
Raymond Hettinger 的方法似乎相当不错:
def tail_F(some_file):
first_call = True
while True:
try:
with open(some_file) as input:
if first_call:
input.seek(0, 2)
first_call = False
latest_data = input.read()
while True:
if '\n' not in latest_data:
latest_data += input.read()
if '\n' not in latest_data:
yield ''
if not os.path.isfile(some_file):
break
continue
latest_lines = latest_data.split('\n')
if latest_data[-1] != '\n':
latest_data = latest_lines[-1]
else:
latest_data = input.read()
for line in latest_lines[:-1]:
yield line + '\n'
except IOError:
yield ''
如果文件变得不可访问或没有新数据,此生成器将返回 ''。
[更新]
倒数第二个答案会在数据用完时绕到文件顶部。– 伊莱
我认为第二个将在尾部进程结束时输出最后十行,即-f
每当出现 I/O 错误时。在我能想到的类 Unix 环境中的大多数情况下,这种tail --follow --retry
行为与此相差不远。
也许如果你更新你的问题来解释你的真正目标是什么(你想模仿 tail --retry 的原因),你会得到一个更好的答案。
最后一个答案实际上并不跟随尾部,而只是读取运行时可用的内容。– 伊莱
当然,tail 默认会显示最后 10 行...您可以使用 file.seek 将文件指针定位在文件末尾,我将留一个正确的实现作为练习给读者。
恕我直言,file.read() 方法比基于子流程的解决方案要优雅得多。
实际上,文件的唯一可移植方式tail -f
似乎是从文件中读取并重试(在 a 之后sleep
)如果read
返回 0。tail
各种平台上的实用程序使用特定于平台的技巧(例如kqueue
在 BSD 上)来有效地永久跟踪文件无需sleep
.
因此,tail -f
纯粹在 Python 中实现一个好的可能不是一个好主意,因为您将不得不使用最小公分母实现(而不求助于特定于平台的 hack)。使用简单subprocess
的打开并在单独的线程中遍历行,您可以轻松地在 Python 中tail -f
实现非阻塞操作。tail
示例实现:
import threading, Queue, subprocess
tailq = Queue.Queue(maxsize=10) # buffer at most 100 lines
def tail_forever(fn):
p = subprocess.Popen(["tail", "-f", fn], stdout=subprocess.PIPE)
while 1:
line = p.stdout.readline()
tailq.put(line)
if not line:
break
threading.Thread(target=tail_forever, args=(fn,)).start()
print tailq.get() # blocks
print tailq.get_nowait() # throws Queue.Empty if there are no lines to read
将Ijaz Ahmad Khan 的答案改编为仅在完全写入时才产生行(行以换行符结尾)给出了一个没有外部依赖关系的 pythonic 解决方案:
def follow(file, sleep_sec=0.1) -> Iterator[str]:
""" Yield each line from a file as they are written.
`sleep_sec` is the time to sleep after empty reads. """
line = ''
while True:
tmp = file.readline()
if tmp is not None:
line += tmp
if line.endswith("\n"):
yield line
line = ''
else if sleep_sec:
time.sleep(sleep_sec)
if __name__ == '__main__':
with open("test.txt", 'r') as file:
for line in follow(file):
print(line, end='')
所以,这来得很晚,但我又遇到了同样的问题,现在有一个更好的解决方案。只需使用pygtail:
Pygtail 读取尚未读取的日志文件行。它甚至会处理已轮换的日志文件。基于 logcheck 的 logtail2 ( http://logcheck.org )
所有使用 tail -f 的答案都不是 Pythonic。
这是pythonic方式:(不使用外部工具或库)
def follow(thefile):
while True:
line = thefile.readline()
if not line or not line.endswith('\n'):
time.sleep(0.1)
continue
yield line
if __name__ == '__main__':
logfile = open("run/foo/access-log","r")
loglines = follow(logfile)
for line in loglines:
print(line, end='')
理想情况下,我有类似 tail.getNewData() 的东西,每次我想要更多数据时我都可以调用它
我们已经有了一个,它非常好。只要您需要更多数据, 只需调用f.read() 即可。它将从上次读取停止的地方开始读取,并将读取数据流的末尾:
f = open('somefile.log')
p = 0
while True:
f.seek(p)
latest_data = f.read()
p = f.tell()
if latest_data:
print latest_data
print str(p).center(10).center(80, '=')
要逐行阅读,请使用f.readline()。有时,正在读取的文件将以部分读取的行结束。使用f.tell()查找当前文件位置并使用f.seek()将文件指针移回不完整行的开头来处理这种情况。有关工作代码,请参阅此 ActiveState 配方。
您可以使用“tailer”库:https ://pypi.python.org/pypi/tailer/
它可以选择获取最后几行:
# Get the last 3 lines of the file
tailer.tail(open('test.txt'), 3)
# ['Line 9', 'Line 10', 'Line 11']
它也可以跟随一个文件:
# Follow the file as it grows
for line in tailer.follow(open('test.txt')):
print line
如果一个人想要类似尾巴的行为,那似乎是一个不错的选择。
另一种选择是tailhead
提供 Python 版本的库tail
以及head
可在您自己的模块中使用的实用程序和 API。
最初基于该tailer
模块,它的主要优点是能够通过路径跟踪文件,即它可以处理重新创建文件时的情况。此外,它还针对各种边缘情况进行了一些错误修复。
Python 是“包括电池” - 它有一个很好的解决方案:https ://pypi.python.org/pypi/pygtail
读取尚未读取的日志文件行。记住上次完成的地方,并从那里继续。
import sys
from pygtail import Pygtail
for line in Pygtail("some.log"):
sys.stdout.write(line)
您也可以使用“AWK”命令。
更多信息请见:http
://www.unix.com/shell-programming-scripting/41734-how-print-specific-lines-awk.html
awk 可用于尾随最后一行、最后几行或任何一行文件。
这可以从 python 调用。
如果您在 linux 上,您可以通过以下方式在 python 中实现非阻塞实现。
import subprocess
subprocess.call('xterm -title log -hold -e \"tail -f filename\"&', shell=True, executable='/bin/csh')
print "Done"
# -*- coding:utf-8 -*-
import sys
import time
class Tail():
def __init__(self, file_name, callback=sys.stdout.write):
self.file_name = file_name
self.callback = callback
def follow(self, n=10):
try:
# 打开文件
with open(self.file_name, 'r', encoding='UTF-8') as f:
# with open(self.file_name,'rb') as f:
self._file = f
self._file.seek(0, 2)
# 存储文件的字符长度
self.file_length = self._file.tell()
# 打印最后10行
self.showLastLine(n)
# 持续读文件 打印增量
while True:
line = self._file.readline()
if line:
self.callback(line)
time.sleep(1)
except Exception as e:
print('打开文件失败,囧,看看文件是不是不存在,或者权限有问题')
print(e)
def showLastLine(self, n):
# 一行大概100个吧 这个数改成1或者1000都行
len_line = 100
# n默认是10,也可以follow的参数传进来
read_len = len_line * n
# 用last_lines存储最后要处理的内容
while True:
# 如果要读取的1000个字符,大于之前存储的文件长度
# 读完文件,直接break
if read_len > self.file_length:
self._file.seek(0)
last_lines = self._file.read().split('\n')[-n:]
break
# 先读1000个 然后判断1000个字符里换行符的数量
self._file.seek(-read_len, 2)
last_words = self._file.read(read_len)
# count是换行符的数量
count = last_words.count('\n')
if count >= n:
# 换行符数量大于10 很好处理,直接读取
last_lines = last_words.split('\n')[-n:]
break
# 换行符不够10个
else:
# break
# 不够十行
# 如果一个换行符也没有,那么我们就认为一行大概是100个
if count == 0:
len_perline = read_len
# 如果有4个换行符,我们认为每行大概有250个字符
else:
len_perline = read_len / count
# 要读取的长度变为2500,继续重新判断
read_len = len_perline * n
for line in last_lines:
self.callback(line + '\n')
if __name__ == '__main__':
py_tail = Tail('test.txt')
py_tail.follow(1)