3

我正在执行多个 python 进程,如下所示:

find /path/to/logfiles/*.gz | xargs -n1 -P4 python logparser.py

并且输出偶尔会被打乱。

输出流是无缓冲的,写入的大小小于默认系统(osx 10.8.2,python 2.7.2)定义的 512 字节的 PIPE_BUF,所以我相信写入应该是原子的,但输出偶尔会被打乱。我一定会遗漏一些东西,任何建议都将不胜感激。

谢谢。

脚本的简化骨架是:

import argparse
import csv
import gzip


class class UnbufferedWriter(object):
    """Unbuffered Writer from 
       http://mail.python.org/pipermail/tutor/2003-November/026645.html

    """

    def __init__(self, stream):
        self.stream = stream

    def write(self, data):
        self.stream.write(data)
        self.stream.flush()

    def __getattr__(self, attr):
        return getattr(self.stream, attr)


def parse_records(infile):
    if infile.name.endswith('.gz'):
        lines = gzip.GzipFile(fileobj=infile)
    else:
        lines = infile

    for line in lines:
        # match lines with regex and filter out on some conditions.
        yield line_as_dict

def main(infile, outfile):
    fields = ['remote_addr', 'time', 'request_time', 'request', 'status']
    writer = csv.DictWriter(outfile, fields, quoting=csv.QUOTE_ALL)

    for record in parse_records(infile):
        row_as_dict = dict(
            remote_addr=record.get('remote_addr', ''),
            time=record.get('time', ''),
            request_time=record.get('request_time', ''),
            request=record.get('request', ''),
            status=record.get('status', '')
        )
        writer.writerow(row_as_dict)

if __name__ == '__main__':

    parser = argparse.ArgumentParser()
    parser.add_argument('infile', nargs='?', type=argparse.FileType('r'), default=sys.stdin)
    parser.add_argument('outfile', nargs='?', type=argparse.FileType('w', 0), default=sys.stdout)

    pargs = parser.parse_args()
    pargs.outfile = UnbufferedWriter(pargs.outfile)

    main(pargs.infile, pargs.outfile)
4

1 回答 1

3

您可能要考虑使用GNU Parallel。默认情况下,输出会被缓冲,直到实例完成运行:

在运行输出数据的作业时,您通常不希望多个作业的输出一起运行。GNU 并行默认对每个作业的输出进行分组,因此在作业完成时会打印输出。如果您希望在作业运行时打印输出,您可以使用 -u。

我相信运行脚本的最佳方式是 vai:

find /path/to/logfiles/*.gz | parallel python logparser.py

或者

parallel python logparser.py ::: /path/to/logfiles/*.gz

您可以使用标志指定要运行的进程数-j,即-j4.

Parallel 的好处是它支持输入参数的笛卡尔积。例如,如果您想为每个文件迭代一些额外的参数,您可以使用:

parallel python logparser.py ::: /path/to/logfiles/*.gz ::: 1 2 3

这将导致跨多个进程运行以下内容:

python logparser.py /path/to/logfiles/A.gz 1
python logparser.py /path/to/logfiles/A.gz 2
python logparser.py /path/to/logfiles/A.gz 3
python logparser.py /path/to/logfiles/B.gz 1
python logparser.py /path/to/logfiles/B.gz 2
python logparser.py /path/to/logfiles/B.gz 3
...

祝你好运!

于 2013-01-16T22:44:03.653 回答