2

在 Python 方面,我有点初学者,但我在学校的一个项目需要我在这个 reddit 流行数据集上执行分类算法。这些文件是巨大的 .zst 文件,可以在这里找到:https ://files.pushshift.io/reddit/submissions/ 无论如何,我只是不确定如何将它提取到数据库中,因为我们已经完成了任务到目前为止,我只使用了 .csv 数据集,我可以轻松地将其放入 pandas 数据框中。我偶然发现了另一篇文章,并尝试使用代码:

    def transform_zst_file(self,infile):
        zst_num_bytes = 2**22
        lines_read = 0
        dctx = zstd.ZstdDecompressor()
        with dctx.stream_reader(infile) as reader:
            previous_line = ""
            while True:
                chunk = reader.read(zst_num_bytes)
                if not chunk:
                    break
                string_data = chunk.decode('utf-8')
                lines = string_data.split("\n")
                for i, line in enumerate(lines[:-1]):
                    if i == 0:
                        line = previous_line + line
                    self.appendData(line, self.type)
                    lines_read += 1
                    if self.max_lines_to_read and lines_read >= self.max_lines_to_read:
                        return
                previous_line = lines[-1]

但是我不完全确定如何将其放入熊猫数据框中,或者如果文件太大,则仅将一定百分比的数据点放入数据框中。任何帮助将不胜感激!

以下代码只会在我每次尝试运行它时使我的计算机崩溃:

import zstandard as zstd  
your_filename = "..." 
with open(your_filename, "rb") as f:     
    data = f.read()  

dctx = zstd.ZstdDecompressor() 
decompressed = dctx.decompress(data)

可能是由于文件的大小太大,有没有办法将这个文件的一部分提取到 pandas 数据框中?

4

2 回答 2

3

该文件已使用压缩库 Zstandard ( https://github.com/facebook/zstd ) 进行压缩。

对您来说最简单的事情可能是使用安装 python-zstandard ( https://pypi.org/project/zstandard/ )

pip install zstandard

然后在 python 脚本中运行类似

import zstandard as zstd

your_filename = "..."
with open(your_filename, "rb") as f:
    data = f.read()

dctx = zstd.ZstdDecompressor()
decompressed = dctx.decompress(data)

现在您可以直接使用解压后的数据,也可以将其写入某个文件,然后将其加载到 pandas。祝你好运!

于 2020-04-06T20:27:44.173 回答
0

我偶然发现了一个类似的由zst转储组成的 Reddit 数据集。为了迭代您的 zst 文件的内容,我使用了以下代码,您可以将其作为脚本运行:

import zstandard
import os
import json
import sys
from datetime import datetime
import logging.handlers


log = logging.getLogger("bot")
log.setLevel(logging.DEBUG)
log.addHandler(logging.StreamHandler())


def read_lines_zst(file_name):
    with open(file_name, 'rb') as file_handle:
        buffer = ''
        reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
        while True:
            chunk = reader.read(2**27).decode()
            if not chunk:
                break
            lines = (buffer + chunk).split("\n")

            for line in lines[:-1]:
                yield line, file_handle.tell()

            buffer = lines[-1]
        reader.close()


if __name__ == "__main__":
    file_path = sys.argv[1]
    file_size = os.stat(file_path).st_size
    file_lines = 0
    file_bytes_processed = 0
    created = None
    field = "subreddit"
    value = "wallstreetbets"
    bad_lines = 0
    try:
        for line, file_bytes_processed in read_lines_zst(file_path):
            try:
                obj = json.loads(line)
                created = datetime.utcfromtimestamp(int(obj['created_utc']))
                temp = obj[field] == value
            except (KeyError, json.JSONDecodeError) as err:
                bad_lines += 1
            file_lines += 1
            if file_lines % 100000 == 0:
                log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {bad_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
    except Exception as err:
        log.info(err)

    log.info(f"Complete : {file_lines:,} : {bad_lines:,}")
于 2021-10-18T07:02:21.037 回答