您可以使用该bz2
包打开 BZ2 编码文件并将它们视为常规文件对象。以二进制方式读/写具有较小的性能优势。假设您的数据是 ASCII 或 UTF-8 并且数据中不需要转义制表符,您可以逐行读取文件,在新时间戳出现时打开并写入输出。
import bz2
import os
outfile = None
date = b""
with bz2.open("file") as fileobj:
for line in filobj:
# get date from, ex. "2021-05-01 00:00:00", timestamp
new_date = line.split(b"\t")[7].split(b" ")[0]
# roll to new file as needed, appending, so existing data not overwritten
if new_date != date:
date = new_date
new_file = f"splitted_file_{new_date}.csv.bz2"
exists = os.path.exists(new_file)
outfile = bz2.open(new_file, "ab")
if not exists:
outfile.write(b"\t".join([b'a', b'b', b'c', b'd', b'e', b'f', b'timestamp']) + b"\n")
# write the row
outfile.writeline(line)
if outfile:
outfile.close()
您可以通过管道加快速度。将解密和加密都分配给单独的 bzip2 进程,这些进程将在不同的内核上并行运行。您可以创建管道和文件来在脚本本身中执行此操作,而不是 shell 管道。假设bzip2
您的系统上存在您可以执行以下操作。我添加了tqdm
模块来打印沿途的进度。
#!/usr/bin/env python3
import subprocess as subp
from pathlib import Path
import sys
import tqdm
# TODO: Better command line
try:
in_file_name = Path(sys.argv[1])
except IndexError:
print("usage: unbzcsv.py filename")
exit(1)
# build the format string used for generating output file names
out_file_name_fmt = "{}-{{}}.{}".format(*in_file_name.name.split(".", maxsplit=1))
out_file = None
date = b""
bzwriter = None
bzfile = None
# run bzip2 to decompress to stdout
bzreader = subp.Popen(["bzip2", "--decompress", "--stdout", in_file_name],
stdin=subp.DEVNULL, stdout=subp.PIPE)
# use tqdm to display progress as line count
progress = tqdm.tqdm(bzreader.stdout, desc="Lines", unit=" lines", unit_scale=True)
# read lines and fan out to files
try:
for line in progress:
# get date from, ex. "2021-05-01 00:00:00", timestamp
new_date = line.split(b"\t")[7].split(b" ")[0]
# roll to new file as needed, appending, so existing data not overwritten
if new_date != date:
date = new_date
out_file_name = out_file_name_fmt.format(date.decode("utf-8"))
if bzwriter is not None:
bzwriter.stdin.close()
bzwriter.wait()
bzwriter = None
bzfile.close()
print("\nwriting", out_file_name)
progress.refresh()
bzfile = open(out_file_name, "wb")
bzwriter = subp.Popen(["bzip2", "--compress"],
stdin=subp.PIPE, stdout=bzfile)
# write the row
bzwriter.stdin.write(line)
finally:
bzreader.terminate() # in case of error
if bzwriter:
bzwriter.stdin.close()
bzwriter.wait()
bzfile.close()
bzreader.wait()