1

我厌倦了链接

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

使用 SimpleConsumer 来消费消息,但是在使用它时我发现了一些突然的行为,如下所示:

消费者正在消费来自特定分区的消息。但问题是,当我的消费者正在运行并且我使用生产者将消息推送到主题时,它会使用来自该分区的消息。但是,如果我的消费者目前没有运行并且我将一些消息推送到主题并再次启动消费者,它不会消费生产者推送的消息,但它再次准备好消费现在将被推送的消息。我正在使用 LatestTime() 代替 od EarliestTime() 因为我只想使用未处理的消息。

例如

情况1

消费者正在运行:

Producer将M1、M2、M3消息推送到topic 1的partition 1

结果:消费者将消费所有三个消息。

案例 - 2

消费者没有运行

producer 现在将 m4、m5 m6 messgae 推送到主题 1 的分区 1

现在调用消费者

结果:消费者不使用消息 m4、m5、m6,但如果我检查偏移量,则它设置为 7。这意味着生产者在生成消息时已将偏移量提前到 7,因此消费者现在将使用来自偏移量 7 的消息

理想情况下,当消费者再次出现时,它应该从 m4 读取消息,请提供帮助。

4

1 回答 1

0

你这样做是错的。

首先我不确定SimpleConsumer你在找什么。它迫使您自己管理偏移量(例如,它根本不向 Zookeeper 提交偏移量,并且每次您SimpleConsumer再次启动时,它都会再次获取相同的消息)。SimpleConsumer不了解“已处理的消息”。它所能做的就是从某个偏移量开始获取并继续获取,直到你说“停止”。

无论如何,如果您打算自己提交已处理的偏移量,您应该使用EarliestTimeauto.offset.reset=smallest配置条目)。auto.offset.reset意味着如果您的消费者使用错误SimpleConsumer的偏移量进行初始化(-1如果我没记错的话,使用偏移量进行初始化,这显然是错误的)它将重置为smallest可用(EarliestTime)或largest可用(LatestTime)偏移量。

为了更清楚,这里是示例:

你的Case-1

您创建一个消费者并将其指向主题 1 分区 1。由于它最初使用错误的偏移量进行初始化,它会要求代理提供一些适当的偏移量(这里是smallestorlargest偏移量重置的来源)。如果您还没有产生任何消息,则偏移量smallestlargest偏移量都将是0,因此当您产生一些消息时,您的消费者将获取这些消息。

Case-2

你产生 N 条消息(比如 7 条)。然后你开始你的SimpleConsumer. 同样,它使用错误的偏移量进行初始化,并要求代理提供正确的偏移量。使用smallest重置偏移量它将是0,并且使用largest偏移量它将是7。在您的示例中,您使用LargestOffsets的消费者将使用偏移量重新初始化7并开始使用它。

一般来说,看一下高级消费者,在大多数情况下,这就是您要寻找的东西。这是链接

于 2014-12-05T09:15:56.493 回答