1

我通过以下代码将 G1 组中的一个 Kafka 消费者的获取偏移量倒回到 off1:

  consumer1.set_topic_partitions((topic, partition, off1))

在上面的行中,off1 是对应分区中主题的最旧可用消息的偏移量。

现在我在不同的 G2 组中实例化一个 Kafka 消费者,如下所示:

 consumer2 = KafkaConsumer(bootstrap_servers=broker,
                                  auto_offset_reset='smallest',
                                  auto_commit_enable=True,
                                  auto_commit_interval_ms=3000,
                                  group_id='G2'
                                  )

在这里,我将读取偏移量读取为 off1,这与我为 G1 组中的 consumer1 重置的偏移量相同。我认为这不应该发生,因为不同的组偏移量应该不同。如果有人澄清,我将非常感激。提前致谢。

4

1 回答 1

1

两个消费者群体是否阅读相同的主题?

如果是这样,consumer1 被设置为最早的偏移量,同样,由于以下设置,consumer2 也被设置为该主题的最早偏移量:

auto_offset_reset='smallest'

这将在主题的最早偏移处启动消费者。要从主题的最新点开始阅读 consumer2,请使用:

auto_offset_reset='largest'
于 2015-09-04T15:58:39.600 回答