0

在使用 firehose 和 dynamoDB 函数时,我无法理解如何在 node.js 中可靠地使用异步。例如:

async function dynamoDBToCSV(record) {
    ... 
    try {
        const res = await firehose.putRecordBatch(params).promise();
        console.log('res = ', res);
    } catch (err) { 
        console.log('err = ', err);
    }
}

调用函数的代码:

let dataCount = 0;
let items;

do {
    let scanResults = [];
    items =  await docClient.scan(params).promise();
    items.Items.forEach(async(item) => {
        
        dataCount++;

        try {
            if (dataCount === 3) { // test on one packet. 
                await dynamoDBToCSV(item);
            }
            
        } catch(err) { 
            console.log('Failed at item number ' + dataCount);
            return err;
        }
    });
     
    params.ExclusiveStartKey  = items.LastEvaluatedKey;
// } while (typeof items.LastEvaluatedKey != "undefined");
} while (false); // test mode. Runs on one page.

console.log('Interventions in DB = ' + dataCount);  

这条线await docClient.scan(params).promise();似乎工作正常。它完成了我期望调用该函数并等待响应准备好的操作。但是,await firehose.putRecordBatch(params).promise();似乎不起作用。输出只是打印'Interventions in DB = ...',完全跳过了消防软管部分。

我似乎经常与 node.js lambdas 中的异步函数作斗争。似乎没有一种可靠的标准方法。

更新

我尝试使用一个小功能:

async function sendToFirehose(params) {
    return Promise((resolve, reject) => {
        firehose.putRecordBatch(params, (err, data) => {
            if (err) return reject(err);
            return resolve(data);
        });
    });
}

...
await sendToFirehose(params);

但得到错误:

{
"errorType": "Runtime.UnhandledPromiseRejection",
"errorMessage": "TypeError: undefined is not a promise",
"reason": {
    "errorType": "TypeError",
    "errorMessage": "undefined is not a promise",
    "stack": [
        "TypeError: undefined is not a promise",
        "    at Promise (<anonymous>)",
        "    at sendToFirehose (/var/task/index.js:76:12)",
        "    at dynamoDBToCSVArray (/var/task/index.js:160:11)",
        "    at /var/task/index.js:56:21",
        "    at Array.forEach (<anonymous>)",
        "    at updateAll (/var/task/index.js:47:21)",
        "    at processTicksAndRejections (internal/process/task_queues.js:97:5)",
        "    at async Runtime.exports.handler (/var/task/index.js:11:19)"
    ]
},
"promise": {},
"stack": [
    "Runtime.UnhandledPromiseRejection: TypeError: undefined is not a promise",
    "    at process.<anonymous> (/var/runtime/index.js:35:15)",
    "    at process.emit (events.js:315:20)",
    "    at process.EventEmitter.emit (domain.js:483:12)",
    "    at processPromiseRejections (internal/process/promises.js:209:33)",
    "    at processTicksAndRejections (internal/process/task_queues.js:98:32)"
]

}

4

1 回答 1

0

我也见过很多这样的例子,A​​WS sdk 只是搞砸了 Promise。相反,我建议您将回调包装在 Promise 中。

你可以这样做:

async function dynamoDBToCSV(record) {
    return Promise((resolve, reject) => {
        firehose.putRecordBatch(params, (err, data) => {
            if (err) return reject(err);
            return resolve(data);
        });
    }
}
于 2020-11-04T23:12:39.700 回答