运行时环境是节点 14(AWS Lambda 函数)。
S3 存储桶和 Lambda 函数在同一个区域,我已经确认 Lambda 函数能够从 S3 获取对象(即权限似乎不是问题)。
Lambda 在将对象(一个非常简单的 CSV 文件)放入 S3 存储桶时触发。CloudWatch 日志流中不会出现任何错误或异常。
包.json
{
"dependencies": {
"@fast-csv/parse": "4.3.6"
}
}
index.js
const aws = require('aws-sdk');
const s3 = new aws.S3({region: 'us-east-2'});
const fs = require('fs');
const csv = require('@fast-csv/parse');
exports.handler = async (event, context) => {
const bucket = event.Records[0].s3.bucket.name;
const key = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
const params = {
Bucket: bucket,
Key: key,
};
const file = await s3.getObject(params).promise();
fs.createReadStream(file.Body).pipe(csv.parse())
.on('error', (error) => console.error(error))
.on('data', (row) => console.log(row))
.on('end', (rowCount) => console.log(`Parsed ${rowCount} rows`));
};
我还尝试了以下变体,结果相同:
index.js (variant)
const aws = require('aws-sdk');
const s3 = new aws.S3({region: 'us-east-2'});
const fs = require('fs');
const csv = require('fast-csv');
exports.handler = async (event, context) => {
const bucket = event.Records[0].s3.bucket.name;
const key = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
const params = {
Bucket: bucket,
Key: key,
};
const file = await s3.getObject(params).promise();
const stream = fs.createReadStream(file.Body);
csv.parseStream(stream)
.on('data', (data) => {
console.info('Data: ' + JSON.stringify(data));
})
.on('data-invalid', (data) => {
console.error('Invalid batch row ' + data);
})
.on('end', () => {
console.info('End of Stream');
})
.on('error', (error) => {
let message = "Error in csv stream processing";
console.error(message, ":", error);
}
);
};
注意:我已经尝试过简单地做await s3.getObject(params).createReadStream()
,但这会导致undefined
通过promise()
首先获取对象数据来获取对象。
我已经为此苦苦挣扎了几个小时,因此不胜感激。谢谢!