0

我正在用python中的confluent-kafka创建一个消费者,我想以这样一种方式创建它,如果消费者重新启动,它会从主题中的最后一条可用消息(每个分区)开始,它是否无关紧要在没有提交的情况下留下消息。

这是为了避免处理数以百万计的消息,这些消息是在消费者关闭时生成的,并且不再需要处理。

我尝试设置参数auto.offset.reset的不同选项,但最多从最后提交的偏移量开始。这是我的配置:

consumer = Consumer({"bootstrap.servers": "localhost:9092",
                     "group.id": group_id,
                     "auto.offset.reset": "latest",
                     "isolation.level": "read_committed",
                     "default.topic.config": {"enable.auto.commit": False}})

有没有办法实现这种行为?

注意:我可能有多个消费者,但没有一个手动分配给特定分区

4

1 回答 1

0

仅当auto.offset.reset没有提交的偏移量时才应用配置。

如果您想始终从头开始重新启动,您可以使用禁用自动提交enable.auto.commit=false(并确保也不要显式提交),并设置auto.offset.resetlatest.

另一种选择是在将分区分配给消费者(使用on_assigned())时使用get_watermark_offsets()和的组合显式搜索到末尾seek()

于 2021-06-17T11:52:14.803 回答