TL;博士:
- 使用异步迭代器从流管道的末尾拉取!
- 不要在任何流代码中使用异步函数!
细节:
生命之谜的秘密async/await
似乎被包裹起来了Async Iterators
!
简而言之,我将一些流连接在一起,最后,我创建了一个异步迭代器来将内容拉出,以便我可以异步调用数据库。ChunkStream 为我做的唯一一件事就是排队多达 1,000 个来调用数据库,而不是为每个项目调用。我是队列新手,所以可能已经有更好的方法了。
// ...
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const JSONbigint = require('json-bigint');
JSON.parse = JSONbigint.parse; // Let there be proper bigint handling!
JSON.stringify = JSONbigint.stringify;
const stream = require('stream');
const JSONStream = require('JSONStream');
exports.handler = async (event, context) => {
// ...
let bucket, key;
try {
bucket = event.Records[0].s3.bucket.name;
key = event.Records[0].s3.object.key;
console.log(`Fetching S3 file: Bucket: ${bucket}, Key: ${key}`);
const parser = JSONStream.parse('*'); // Converts file to JSON objects
let chunkStream = new ChunkStream(1000); // Give the db a chunk of work instead of one item at a time
let endStream = s3.getObject({ Bucket: bucket, Key: key }).createReadStream().pipe(parser).pipe(chunkStream);
let totalProcessed = 0;
async function processChunk(chunk) {
let chunkString = JSON.stringify(chunk);
console.log(`Upserting ${chunk.length} items (starting with index ${totalProcessed}) items to the db.`);
await updateDb(chunkString, pool, 1000); // updateDb and pool are part of missing code
totalProcessed += chunk.length;
}
// Async iterator
for await (const batch of endStream) {
// console.log(`Processing batch (${batch.length})`, batch);
await processChunk(batch);
}
} catch (ex) {
context.fail("stream S3 file failed");
throw ex;
}
};
class ChunkStream extends stream.Transform {
constructor(maxItems, options = {}) {
options.objectMode = true;
super(options);
this.maxItems = maxItems;
this.batch = [];
}
_transform(item, enc, cb) {
this.batch.push(item);
if (this.batch.length >= this.maxItems) {
// console.log(`ChunkStream: Chunk ready (${this.batch.length} items)`);
this.push(this.batch);
// console.log('_transform - Restarting the batch');
this.batch = [];
}
cb();
}
_flush(cb) {
// console.log(`ChunkStream: Flushing stream (${this.batch.length} items)`);
if (this.batch.length > 0) {
this.push(this.batch);
this.batch = [];
}
cb();
}
}