0

我有大量没有任何标题的bz2格式化文件(每个)。30GB我可以500M使用以下内容轻松地将它们拆分为每种尺寸pileline

bzcat logging.abc_gps.bz2 | pv | split -b 500M -d -a 4 --filter='bzip > $FILE.csv.bz2' - splitted_file-

但是我无法添加['a' 'b' 'c' 'd' 'e' 'f' 'timestamp']要包含每个拆分bz2文件的标题。

更重要的是,我想不基于 拆分文件500M,而是希望bz2每天根据数据中的内容拆分文件(例如:splitted_file_2021-01-01.csv.bz2和) 。splitted_file_2021-01-02.csv.bz2timestamp

数据是制表符分隔的文本,如下所示(没有标题,需要添加它们):

19252547212 1   3041    2   1   74.18   1.8504  2021-05-01 00:00:00
19252547213 1   5055    2   1   0       0       2021-05-01 00:00:00
19252547214 1   5073    1   1   53.81   0.1836  2021-05-01 00:00:00
4

1 回答 1

1

您可以使用该bz2包打开 BZ2 编码文件并将它们视为常规文件对象。以二进制方式读/写具有较小的性能优势。假设您的数据是 ASCII 或 UTF-8 并且数据中不需要转义制表符,您可以逐行读取文件,在新时间戳出现时打开并写入输出。

import bz2
import os

outfile = None
date = b""

with bz2.open("file") as fileobj:
    for line in filobj:
        # get date from, ex. "2021-05-01 00:00:00", timestamp
        new_date = line.split(b"\t")[7].split(b" ")[0]
        # roll to new file as needed, appending, so existing data not overwritten
        if new_date != date:
            date = new_date
            new_file = f"splitted_file_{new_date}.csv.bz2"
            exists = os.path.exists(new_file)
            outfile = bz2.open(new_file, "ab")
            if not exists:
                outfile.write(b"\t".join([b'a', b'b', b'c', b'd', b'e', b'f', b'timestamp']) + b"\n")
        # write the row
        outfile.writeline(line)
if outfile:
    outfile.close()

您可以通过管道加快速度。将解密和加密都分配给单独的 bzip2 进程,这些进程将在不同的内核上并行运行。您可以创建管道和文件来在脚本本身中执行此操作,而不是 shell 管道。假设bzip2您的系统上存在您可以执行以下操作。我添加了tqdm模块来打印沿途的进度。

#!/usr/bin/env python3

import subprocess as subp
from pathlib import Path
import sys
import tqdm

# TODO: Better command line
try:
    in_file_name = Path(sys.argv[1])
except IndexError:
    print("usage: unbzcsv.py filename")
    exit(1)

# build the format string used for generating output file names
out_file_name_fmt = "{}-{{}}.{}".format(*in_file_name.name.split(".", maxsplit=1))
out_file = None
date = b""
bzwriter = None
bzfile = None

# run bzip2 to decompress to stdout
bzreader = subp.Popen(["bzip2", "--decompress", "--stdout", in_file_name], 
        stdin=subp.DEVNULL, stdout=subp.PIPE)

# use tqdm to display progress as line count
progress = tqdm.tqdm(bzreader.stdout, desc="Lines", unit=" lines", unit_scale=True)

# read lines and fan out to files
try:
    for line in progress:
        # get date from, ex. "2021-05-01 00:00:00", timestamp
        new_date = line.split(b"\t")[7].split(b" ")[0]
        # roll to new file as needed, appending, so existing data not overwritten
        if new_date != date:
            date = new_date
            out_file_name = out_file_name_fmt.format(date.decode("utf-8"))
            if bzwriter is not None:
                bzwriter.stdin.close()
                bzwriter.wait()
                bzwriter = None
                bzfile.close()
            print("\nwriting", out_file_name)
            progress.refresh()
            bzfile = open(out_file_name, "wb")
            bzwriter = subp.Popen(["bzip2", "--compress"],
                    stdin=subp.PIPE, stdout=bzfile)
        # write the row
        bzwriter.stdin.write(line)
finally:
    bzreader.terminate() # in case of error
    if bzwriter:
        bzwriter.stdin.close()
        bzwriter.wait()
        bzfile.close()
    bzreader.wait()
于 2021-10-09T23:35:39.257 回答