0

我正在使用 NodeJS 来消费来自 Kafka 的消息,收到消息后,我会将它带到 Elasticsearch 中创建索引。这是我的一段代码:

kafkaConsumer.on('message', function (message) {
    elasticClient.index({
        index: 'test',
        type: 'sample',
        body: message
    }, function (error, response) {
        if (error) {

            // Stop consuming message here

            console.log(error);
        }
        console.log(response);
    });
});

我想确保在继续使用下一条消息之前必须成功创建索引,因为我不希望任何消息丢失。

4

2 回答 2

0

试一试consumer.pause()/resume()
pause() 暂停消费者
resume() 恢复消费者

于 2016-05-31T12:51:46.953 回答
0

您可以暂停您的消费者,使其不会订阅和获取新消息。一旦您从 elasticClient 获得响应,您就可以恢复您的 kafka 消费者。

kafkaConsumer.pause();
kafkaConsumer.resume();

根据您的设置使用它。

于 2016-05-31T12:54:59.630 回答