我正在使用 NodeJS 来消费来自 Kafka 的消息,收到消息后,我会将它带到 Elasticsearch 中创建索引。这是我的一段代码:
kafkaConsumer.on('message', function (message) {
elasticClient.index({
index: 'test',
type: 'sample',
body: message
}, function (error, response) {
if (error) {
// Stop consuming message here
console.log(error);
}
console.log(response);
});
});
我想确保在继续使用下一条消息之前必须成功创建索引,因为我不希望任何消息丢失。