1

我正在使用 confluent-kafka Python 库从 kafka 中读取。我正在使用以下消费者设置

Consumer ={
"bootstrap.servers" : kafka_server,
"group_id" : "testing",
"auto.offset.reset" : "latest"}

我的目标是确保我始终阅读 kafka 中的最新消息。只要程序继续运行,上述方法就可以工作。但是,如果程序由于某种原因崩溃,它会从上次使用的消息开始读取,而不是从主题中的最后一条消息开始读取。

我不介意丢失一些消息,但我始终阅读最新消息是绝对必要的。看起来消费者记住了偏移量并从它开始而不是从最新的偏移量开始。

我尝试将enable.auto.commit参数设置为 False,但我得到了相同的结果。

4

2 回答 2

0

enable.auto.commit 应该是 true,如果你想实现这种情况。

由于您有 enable.auto.commit='false',这意味着您的代码(消费者)有责任提交偏移量。如果发生崩溃,它可能无法保证提交偏移量,这会导致您的应用程序从最后一条消费消息开始。

配置“最新”并不意味着消费者会跳过消息并处理最新消息。

于 2021-04-21T18:23:48.733 回答
-1

如果您想阅读最新消息,请始终使用唯一group_id的消费者并确保auto.offset.reset是最新的。

您可以始终使用uuid生成随机 id

 Consumer ={ "bootstrap.servers" : kafka_server, "group_id" : uuid.uuid4(), "auto.offset.reset" : "latest"}
于 2021-04-21T16:10:36.253 回答