0

我有一个超过 10GB 的 BZ2 文件。我想在不将其解压缩为临时文件的情况下阅读它(它将超过 50GB)。

使用这种方法:

import bz2, time
t0 = time.time()
time.sleep(0.001) # to avoid / by 0
with bz2.open("F:\test.bz2", 'rb') as f:
    for i, l in enumerate(f):
        if i % 100000 == 0:
            print('%i lines/sec' % (i/(time.time() - t0)))

我每秒只能读取 ~ 250k 行。在一个类似的文件上,首先解压缩,我每秒得到大约 3M 行,即 x10 因子:

with open("F:\test.txt", 'rb') as f:

我认为这不仅是由于固有的解压 CPU 时间(因为解压到临时文件的总时间 + 读取为未压缩文件的总时间远小于此处描述的方法),还可能是由于缺乏缓冲或其他原因。还有其他更快的 Python 实现bz2.open吗?

如何以二进制模式加速 BZ2 文件的读取并循环“行”?(由 分隔\n

注意:目前time to decompress test.bz2 into test.tmp + time to iterate over lines of test.tmp远小于time to iterate over lines of bz2.open('test.bz2'),这可能不应该是这种情况。

链接主题:https ://discuss.python.org/t/non-optimal-bz2-reading-speed/6869

4

2 回答 2

1

你可以BZ2Decompressor用来处理巨大的文件。它以增量方式解压缩数据块,开箱即用:

t0 = time.time()
time.sleep(0.000001)
with open('temp.bz2', 'rb') as fi:
    decomp = bz2.BZ2Decompressor()
    residue = b''
    total_lines = 0
    for data in iter(lambda: fi.read(100 * 1024), b''):
        raw = residue + decomp.decompress(data) # process the raw data and  concatenate residual of the previous block to the beginning of the current raw data block
        residue = b''
        # process_data(current_block) => do the processing of the current data block
        current_block = raw.split(b'\n')
        if raw[-1] != b'\n':
            residue = current_block.pop() # last line could be incomplete
        total_lines += len(current_block)
        print('%i lines/sec' % (total_lines / (time.time() - t0)))
    # process_data(residue) => now finish processing the last line
    total_lines += 1
    print('Final: %i lines/sec' % (total_lines / (time.time() - t0)))

在这里,我读取了一大块二进制文件,将其输入解压缩器并接收一大块解压缩数据。请注意,必须将解压缩的数据块连接起来才能恢复原始数据。这就是为什么最后一个条目需要特殊处理的原因。

在我的实验中,它的运行速度比您使用io.BytesIO(). bz2众所周知,它很慢,所以如果它困扰您,请考虑迁移到snappyor zstandard

bz2关于在 Python中处理所需的时间。使用 Linux 实用程序将文件解压缩为临时文件,然后处理普通文本文件可能是最快的。否则,您将依赖 Python 的bz2.

于 2021-01-29T01:19:47.733 回答
0

这种方法已经比 native 提供了 x2 的改进bz2.open

import bz2, time, io

def chunked_readlines(f):
    s = io.BytesIO()
    while True:
        buf = f.read(1024*1024)
        if not buf:
            return s.getvalue()
        s.write(buf)
        s.seek(0)
        L = s.readlines()
        yield from L[:-1]
        s = io.BytesIO()
        s.write(L[-1])  # very important: the last line read in the 1 MB chunk might be
                        # incomplete, so we keep it to be processed in the next iteration
                        # TODO: check if this is ok if f.read() stopped in the middle of a \r\n?

t0 = time.time()
i = 0
with bz2.open("D:\test.bz2", 'rb') as f:
    for l in chunked_readlines(f):       # 500k lines per second
    # for l in f:                        # 250k lines per second
        i += 1
        if i % 100000 == 0:
            print('%i lines/sec' % (i/(time.time() - t0)))

或许可以做得更好。

如果我们可以使用saa 简单bytes对象而不是io.BytesIO. 但不幸的是,在这种情况下,它的splitlines()行为并不像预期的那样:splitlines() 和迭代打开的文件会给出不同的结果

于 2021-01-17T20:51:52.827 回答