我厌倦了链接
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
使用 SimpleConsumer 来消费消息,但是在使用它时我发现了一些突然的行为,如下所示:
消费者正在消费来自特定分区的消息。但问题是,当我的消费者正在运行并且我使用生产者将消息推送到主题时,它会使用来自该分区的消息。但是,如果我的消费者目前没有运行并且我将一些消息推送到主题并再次启动消费者,它不会消费生产者推送的消息,但它再次准备好消费现在将被推送的消息。我正在使用 LatestTime() 代替 od EarliestTime() 因为我只想使用未处理的消息。
例如
情况1
消费者正在运行:
Producer将M1、M2、M3消息推送到topic 1的partition 1
结果:消费者将消费所有三个消息。
案例 - 2
消费者没有运行
producer 现在将 m4、m5 m6 messgae 推送到主题 1 的分区 1
现在调用消费者
结果:消费者不使用消息 m4、m5、m6,但如果我检查偏移量,则它设置为 7。这意味着生产者在生成消息时已将偏移量提前到 7,因此消费者现在将使用来自偏移量 7 的消息
理想情况下,当消费者再次出现时,它应该从 m4 读取消息,请提供帮助。