我正在使用 confluent-kafka Python 库从 kafka 中读取。我正在使用以下消费者设置
Consumer ={
"bootstrap.servers" : kafka_server,
"group_id" : "testing",
"auto.offset.reset" : "latest"}
我的目标是确保我始终阅读 kafka 中的最新消息。只要程序继续运行,上述方法就可以工作。但是,如果程序由于某种原因崩溃,它会从上次使用的消息开始读取,而不是从主题中的最后一条消息开始读取。
我不介意丢失一些消息,但我始终阅读最新消息是绝对必要的。看起来消费者记住了偏移量并从它开始而不是从最新的偏移量开始。
我尝试将enable.auto.commit
参数设置为 False,但我得到了相同的结果。