3

有时,kafka-node 消费者从偏移量 0 开始消费,而它的默认行为是只消费较新的消息。然后它不会切换回其默认行为。你知道如何解决这个问题,会发生什么以及它的行为突然改变吗?代码非常简单,无需更改代码即可实现。

var kafka = require("kafka-node") ;
  Consumer = kafka.Consumer;
  client = new kafka.KafkaClient();


  consumer = new Consumer(client, [{ topic: "Topic_23", partition: 0}
                                    ]);


consumer.on("message", function(message) {

    console.log(message)


  });

到目前为止,我发现的唯一解决方案是更改 kafka 主题。然后一切正常。有任何想法吗 ?

4

1 回答 1

1

在 Kafka 中,偏移量与特定的消费者无关,而是与消费者组相关联。在您的代码中,您没有提供消费者组,因此,每次启动消费者时,它都会被分配给不同的消费者组,因此偏移量从0.

以下应该可以解决问题(显然是您第一次阅读所有消息时):

var kafka = require("kafka-node") ;

Consumer = kafka.Consumer;
client = new kafka.KafkaClient();

payload = [{
    topic: "Topic_23", 
    partition: 0
}]

var options = {
    groupId: 'test-consumer-group',
    fromOffset: 'latest'
};


consumer = new Consumer(client, payload, options);
consumer.on("message", function(message) {
    console.log(message)
  });
于 2020-01-31T15:04:15.123 回答