我正在研究一个 lambda,它将存储在 Bucket-A(source) 中的 CSV 文件转换为 NDJSON 并将其移动到 Bucket-B(destination)
下面的逻辑对于小文件可以正常工作,但我的 CSV 文件预计是超过 200 MB,有些大约 2.5GB,即使 lambda 设置为最大超时,此逻辑也会超时。
我在看一篇文章,说的是使用 lambda tmp 空间直接将信息写入/附加到文件中,该文件可以上传到 S3,但 tmp 空间的最大大小约为 ~500 MB
感谢您通读。
非常感谢任何解决此问题的帮助。
import boto3
import ndjson
import csv
from datetime import datetime, timezone
from io import StringIO
import os
def lambda_handler(event, context):
errMsg = None
target_resp_list = []
l_utcdatetime = datetime.utcnow()
l_timestamp = l_utcdatetime.strftime('%Y%m%d%H%M%S')
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb', region_name=os.environ['AWS_REGION'])
for record in event["Records"]:
# Source bucket and key of the new file landed
source_bucket = record["s3"]["bucket"]["name"]
source_key = record["s3"]["object"]["key"]
source_file_name = source_key.split("/")[-1].split(".")[0]
bucket = s3.Bucket(source_bucket)
obj = bucket.Object(key=source_key)
response = obj.get()
records = StringIO(response['Body'].read().decode())
# loop through the csv records and add it to the response list, while adding the snapshot_datetime to each record
for row in csv.DictReader(records):
row['source_snapshot_datetime'] = f'{l_utcdatetime}'
target_resp_list.append(row)
# The below attributes are used in copying the ndjson file to the destination bucket
l_target_bucket = os.getenv("TargetBucket")
l_target_prefix = os.getenv("TargetPrefix")
l_target_key = f"{l_target_prefix}/{source_file_name}_{l_timestamp}.ndjson"
# Moving the ndjson file to Snowflake staging bucket
try:
s3_client.put_object(Body=ndjson.dumps(target_resp_list),
Bucket=l_target_bucket,
Key=l_target_key
)
print("File moved to destination bucket.")
except Exception as ex1:
errMsg = f"Error while copying the file from source to destination bucket - {ex1}"
# Raise exception in case of copy fail
if errMsg is not None:
raise Exception(errMsg)