0

我有一个s3://my-bucket/in.tsv.gz要加载和处理的大文件,将其处理后的版本写回 s3 输出文件s3://my-bucket/out.tsv.gz

  1. 如何在in.tsv.gz不将所有文件加载到内存的情况下直接从 s3 简化(它不适合内存)
  2. 如何将处理后的 gzipped 流直接写入 s3?

在下面的代码中,我展示了我是如何考虑从 s3 加载输入 gzipped 数据帧的,以及.tsv如果它位于本地,我将如何编写它bucket_dir_local = ./

import pandas as pd
import s3fs
import os
import gzip
import csv
import io

bucket_dir = 's3://my-bucket/annotations/'
df = pd.read_csv(os.path.join(bucket_dir, 'in.tsv.gz'), sep='\t', compression="gzip")

bucket_dir_local='./'
# not sure how to do it with an s3 path
with gzip.open(os.path.join(bucket_dir_local, 'out.tsv.gz'), "w") as f:
    with io.TextIOWrapper(f, encoding='utf-8') as wrapper:
        w = csv.DictWriter(wrapper, fieldnames=['test', 'testing'], extrasaction="ignore")
        w.writeheader()
        for index, row in df.iterrows():
            my_dict = {"test": index, "testing": row[6]}
            w.writerow(my_dict)

编辑smart_open看起来像是要走的路。

4

2 回答 2

2

要下载文件,您可以直接在 python 中流式传输 S3 对象。我建议阅读整篇文章,但其中的一些关键内容

import boto3

s3 = boto3.client('s3', aws_access_key_id='mykey', aws_secret_access_key='mysecret') # your authentication may vary
obj = s3.get_object(Bucket='my-bucket', Key='my/precious/object')

import gzip

body = obj['Body']

with gzip.open(body, 'rt') as gf:
    for ln in gf:
        process(ln)

不幸的是,S3 不支持真正的流输入,但是这个 SO 答案有一个实现,可以将文件分块并将每个块发送到 S3。虽然不是“真正的流”,但它可以让您上传大文件,而无需将整个内容保存在内存中

于 2020-11-30T04:51:52.597 回答
2

这是一个从 s3 读取文件并将其写回 s3 的虚拟示例smart_open

from smart_open import open
import os

bucket_dir = "s3://my-bucket/annotations/"

with open(os.path.join(bucket_dir, "in.tsv.gz"), "rb") as fin:
    with open(
        os.path.join(bucket_dir, "out.tsv.gz"), "wb"
    ) as fout:
        for line in fin:
            l = [i.strip() for i in line.decode().split("\t")]
            string = "\t".join(l) + "\n"
            fout.write(string.encode())                                    
于 2020-12-02T06:44:56.843 回答