在使用 firehose 和 dynamoDB 函数时,我无法理解如何在 node.js 中可靠地使用异步。例如:
async function dynamoDBToCSV(record) {
...
try {
const res = await firehose.putRecordBatch(params).promise();
console.log('res = ', res);
} catch (err) {
console.log('err = ', err);
}
}
调用函数的代码:
let dataCount = 0;
let items;
do {
let scanResults = [];
items = await docClient.scan(params).promise();
items.Items.forEach(async(item) => {
dataCount++;
try {
if (dataCount === 3) { // test on one packet.
await dynamoDBToCSV(item);
}
} catch(err) {
console.log('Failed at item number ' + dataCount);
return err;
}
});
params.ExclusiveStartKey = items.LastEvaluatedKey;
// } while (typeof items.LastEvaluatedKey != "undefined");
} while (false); // test mode. Runs on one page.
console.log('Interventions in DB = ' + dataCount);
这条线await docClient.scan(params).promise();
似乎工作正常。它完成了我期望调用该函数并等待响应准备好的操作。但是,await firehose.putRecordBatch(params).promise();
似乎不起作用。输出只是打印'Interventions in DB = ...',完全跳过了消防软管部分。
我似乎经常与 node.js lambdas 中的异步函数作斗争。似乎没有一种可靠的标准方法。
更新
我尝试使用一个小功能:
async function sendToFirehose(params) {
return Promise((resolve, reject) => {
firehose.putRecordBatch(params, (err, data) => {
if (err) return reject(err);
return resolve(data);
});
});
}
...
await sendToFirehose(params);
但得到错误:
{
"errorType": "Runtime.UnhandledPromiseRejection",
"errorMessage": "TypeError: undefined is not a promise",
"reason": {
"errorType": "TypeError",
"errorMessage": "undefined is not a promise",
"stack": [
"TypeError: undefined is not a promise",
" at Promise (<anonymous>)",
" at sendToFirehose (/var/task/index.js:76:12)",
" at dynamoDBToCSVArray (/var/task/index.js:160:11)",
" at /var/task/index.js:56:21",
" at Array.forEach (<anonymous>)",
" at updateAll (/var/task/index.js:47:21)",
" at processTicksAndRejections (internal/process/task_queues.js:97:5)",
" at async Runtime.exports.handler (/var/task/index.js:11:19)"
]
},
"promise": {},
"stack": [
"Runtime.UnhandledPromiseRejection: TypeError: undefined is not a promise",
" at process.<anonymous> (/var/runtime/index.js:35:15)",
" at process.emit (events.js:315:20)",
" at process.EventEmitter.emit (domain.js:483:12)",
" at processPromiseRejections (internal/process/promises.js:209:33)",
" at processTicksAndRejections (internal/process/task_queues.js:98:32)"
]
}