我正在执行多个 python 进程,如下所示:
find /path/to/logfiles/*.gz | xargs -n1 -P4 python logparser.py
并且输出偶尔会被打乱。
输出流是无缓冲的,写入的大小小于默认系统(osx 10.8.2,python 2.7.2)定义的 512 字节的 PIPE_BUF,所以我相信写入应该是原子的,但输出偶尔会被打乱。我一定会遗漏一些东西,任何建议都将不胜感激。
谢谢。
脚本的简化骨架是:
import argparse
import csv
import gzip
class class UnbufferedWriter(object):
"""Unbuffered Writer from
http://mail.python.org/pipermail/tutor/2003-November/026645.html
"""
def __init__(self, stream):
self.stream = stream
def write(self, data):
self.stream.write(data)
self.stream.flush()
def __getattr__(self, attr):
return getattr(self.stream, attr)
def parse_records(infile):
if infile.name.endswith('.gz'):
lines = gzip.GzipFile(fileobj=infile)
else:
lines = infile
for line in lines:
# match lines with regex and filter out on some conditions.
yield line_as_dict
def main(infile, outfile):
fields = ['remote_addr', 'time', 'request_time', 'request', 'status']
writer = csv.DictWriter(outfile, fields, quoting=csv.QUOTE_ALL)
for record in parse_records(infile):
row_as_dict = dict(
remote_addr=record.get('remote_addr', ''),
time=record.get('time', ''),
request_time=record.get('request_time', ''),
request=record.get('request', ''),
status=record.get('status', '')
)
writer.writerow(row_as_dict)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('infile', nargs='?', type=argparse.FileType('r'), default=sys.stdin)
parser.add_argument('outfile', nargs='?', type=argparse.FileType('w', 0), default=sys.stdout)
pargs = parser.parse_args()
pargs.outfile = UnbufferedWriter(pargs.outfile)
main(pargs.infile, pargs.outfile)