我有一个来自订阅主题的消费者轮询。它消耗每条消息并进行一些处理(在几秒钟内),推送到不同的主题并提交偏移量。
总共有5000条消息,
重启前 - 消耗了 2900 条消息并提交了偏移量
重新启动后 - 从偏移量 0 开始消耗。
即使消费者是使用相同的消费者组创建的,它也会从偏移量 0 开始处理消息。
kafka 版本 (strimzi) > 2.0.0 kafka-python == 2.0.1
我有一个来自订阅主题的消费者轮询。它消耗每条消息并进行一些处理(在几秒钟内),推送到不同的主题并提交偏移量。
总共有5000条消息,
重启前 - 消耗了 2900 条消息并提交了偏移量
重新启动后 - 从偏移量 0 开始消耗。
即使消费者是使用相同的消费者组创建的,它也会从偏移量 0 开始处理消息。
kafka 版本 (strimzi) > 2.0.0 kafka-python == 2.0.1
我们不知道您的主题中有多少个分区,但是当消费者在同一个消费者组中创建时,他们将消费来自不同分区的记录(我们不能在一个消费者组中有两个消费者从同一个分区消费并且如果您添加一个消费者,组协调器将执行重新平衡过程以将每个消费者重新分配到特定分区)。
我认为偏移量 0 来自auto.offset.reset
可以是的属性:
latest
: 从日志中的最新偏移开始earliest
:从最早的记录开始。none
:不存在偏移数据时抛出异常。但是只有当您的消费者组没有提交有效的偏移量时,此属性才会生效。
注意:主题中的记录具有保留期 log.retention.ms
属性,因此当您处理日志中的第一条记录时,您的最新消息可能会被删除。
问题:当您想使用来自一个主题的消息并处理数据并将它们写入另一个主题时,为什么您不使用 Kafka Streaming ?