1

我没有意识到这么简单的任务会有多么危险。我们正在尝试流式读取存储在 S3 中的 JSON 文件——我认为我们已经完成了这部分工作。我们的.on('data')回调被调用了,但是 Node 挑选并选择了它想要运行的位——似乎是随机的。

我们设置了一个流阅读器。

stream.on('data', async x => { 
  await saveToDb(x);  // This doesn't await.  It processes saveToDb up until it awaits.
});

有时 db 调用会到达 db——但大多数时候它不会。我得出的结论是 EventEmitter 在异步/等待事件处理程序方面存在问题。只要您的代码是同步的,它似乎就会与您的异步方法一起播放。但是,在您等待的时候,它会随机决定是否实际执行此操作。

它流式传输各种块,我们可以console.log将它们输出并查看数据。但是一旦我们尝试触发等待/异步调用,我们就会停止看到可靠的消息。

我在 AWS Lambda 中运行它,有人告诉我有一些特殊考虑,因为它们显然在某些情况下会停止处理?

我尝试在 IFFY 中包围 await 调用,但这也不起作用。

我错过了什么?有没有办法告诉 JavaScript——“好的,我需要你同步运行这个异步任务。我的意思是——也不要再触发任何事件通知。只是坐在这里等待。”?

4

1 回答 1

1

TL;博士:

  • 使用异步迭代器从流管道的末尾拉取!
  • 不要在任何流代码中使用异步函数!

细节:

生命之谜的秘密async/await似乎被包裹起来了Async Iterators

简而言之,我将一些流连接在一起,最后,我创建了一个异步迭代器来将内容拉出,以便我可以异步调用数据库。ChunkStream 为我做的唯一一件事就是排队多达 1,000 个来调用数据库,而不是为每个项目调用。我是队列新手,所以可能已经有更好的方法了。

// ...
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const JSONbigint = require('json-bigint');
JSON.parse = JSONbigint.parse; // Let there be proper bigint handling!
JSON.stringify = JSONbigint.stringify;
const stream = require('stream');
const JSONStream = require('JSONStream');

exports.handler = async (event, context) => {
    // ...
    let bucket, key;
    try {
        bucket = event.Records[0].s3.bucket.name;
        key = event.Records[0].s3.object.key;
        console.log(`Fetching S3 file: Bucket: ${bucket}, Key: ${key}`);
        const parser = JSONStream.parse('*'); // Converts file to JSON objects
        let chunkStream = new ChunkStream(1000); // Give the db a chunk of work instead of one item at a time
        let endStream = s3.getObject({ Bucket: bucket, Key: key }).createReadStream().pipe(parser).pipe(chunkStream);
        
        let totalProcessed = 0;
        async function processChunk(chunk) {
            let chunkString = JSON.stringify(chunk);
            console.log(`Upserting ${chunk.length} items (starting with index ${totalProcessed}) items to the db.`);
            await updateDb(chunkString, pool, 1000); // updateDb and pool are part of missing code
            totalProcessed += chunk.length;
        }
        
        // Async iterator
        for await (const batch of endStream) {
            // console.log(`Processing batch (${batch.length})`, batch);
            await processChunk(batch);
        }
    } catch (ex) {
        context.fail("stream S3 file failed");
        throw ex;
    }
};

class ChunkStream extends stream.Transform {
    constructor(maxItems, options = {}) {
        options.objectMode = true;
        super(options);
        this.maxItems = maxItems;
        this.batch = [];
    }
    _transform(item, enc, cb) {
        this.batch.push(item);
        if (this.batch.length >= this.maxItems) {
            // console.log(`ChunkStream: Chunk ready (${this.batch.length} items)`);
            this.push(this.batch);
            // console.log('_transform - Restarting the batch');
            this.batch = [];
        }
        cb();
    }
    _flush(cb) {
        // console.log(`ChunkStream: Flushing stream (${this.batch.length} items)`);
        if (this.batch.length > 0) {
            this.push(this.batch);
            this.batch = [];
        }
        cb();
    }
}
于 2020-08-12T01:54:05.637 回答