0

我正在研究一个 lambda,它将存储在 Bucket-A(source) 中的 CSV 文件转换为 NDJSON 并将其移动到 Bucket-B(destination)

下面的逻辑对于小文件可以正常工作,但我的 CSV 文件预计是超过 200 MB,有些大约 2.5GB,即使 lambda 设置为最大超时,此逻辑也会超时。

我在看一篇文章,说的是使用 lambda tmp 空间直接将信息写入/附加到文件中,该文件可以上传到 S3,但 tmp 空间的最大大小约为 ~500 MB

感谢您通读。
非常感谢任何解决此问题的帮助。

import boto3
import ndjson
import csv
from datetime import datetime, timezone
from io import StringIO
import os

def lambda_handler(event, context):
    errMsg = None
    target_resp_list = []
    l_utcdatetime = datetime.utcnow()
    l_timestamp = l_utcdatetime.strftime('%Y%m%d%H%M%S')
    
    s3 = boto3.resource('s3')
    s3_client = boto3.client('s3')
    dynamodb = boto3.resource('dynamodb', region_name=os.environ['AWS_REGION'])
    
    for record in event["Records"]:
        
        # Source bucket and key of the new file landed
        source_bucket = record["s3"]["bucket"]["name"]
        source_key = record["s3"]["object"]["key"]
        source_file_name = source_key.split("/")[-1].split(".")[0]
        
        bucket = s3.Bucket(source_bucket)
        obj = bucket.Object(key=source_key)
        response = obj.get()
        records = StringIO(response['Body'].read().decode())

        # loop through the csv records and add it to the response list, while adding the snapshot_datetime to each record
        for row in csv.DictReader(records):
            row['source_snapshot_datetime'] = f'{l_utcdatetime}'
            target_resp_list.append(row)

        # The below attributes are used in copying the ndjson file to the destination bucket
        l_target_bucket = os.getenv("TargetBucket")
        l_target_prefix = os.getenv("TargetPrefix")
        l_target_key = f"{l_target_prefix}/{source_file_name}_{l_timestamp}.ndjson"

        # Moving the ndjson file to Snowflake staging bucket
        try:
            s3_client.put_object(Body=ndjson.dumps(target_resp_list), 
                Bucket=l_target_bucket, 
                Key=l_target_key
            )
            print("File moved to destination bucket.")
        except Exception as ex1:
            errMsg = f"Error while copying the file from source to destination bucket - {ex1}"
        
        # Raise exception in case of copy fail
        if errMsg is not None:
            raise Exception(errMsg)
4

2 回答 2

1

Lambda 每次执行最多可以运行 15 分钟。我建议首先检查在本地首先处理文件的最坏情况。如果您期望大文件尝试将 lambda 内存提高到满足您要求的最大可行值。

提示:

  • 尝试压缩文件,按 GB 压缩的 CSV 文件减少到兆字节,文本可以压缩很多。
  • 尝试提前拆分工作,如果这个巨大的文件可以被一个 lambda 拆分并由另一个处理,那么你不会太在意执行 timo-out。
于 2020-08-29T09:59:02.510 回答
0

把这个留给以后可能会来看的人。

我认为问题在于 ndjson.dumps 需要花费大量时间来转换列表并推送到 S3,因此我所做的是使用计数器来分块源记录 - 每个 50K,然后调用 dumpChunkToS3(),这基本上是逻辑转储到 S3。
需要一个额外的条件语句,因为行/记录的数量甚至不会除以 50K(在我的情况下是租用的)

# loop through the csv records and add it to the response list, while adding the snapshot_datetime to the record
for row in csv.DictReader(records):
    row['source_snapshot_datetime'] = f'{l_utcdatetime}'
    rowscnt += 1
    target_resp_list.append(row)
    if rowscnt == 50000:
        chunk_id += 1
        respMsg = dumpChunkToS3(s3_client, source_file_name, target_resp_list, chunk_id)
        rowscnt = 0
        del target_resp_list[:]

if rowscnt > 0:
    chunk_id += 1
    respMsg = dumpChunkToS3(s3_client, source_file_name, target_resp_list, chunk_id)
于 2020-08-31T16:06:48.993 回答