8

我阅读了 Kafka 网站上的文档,但是在尝试实现一个完整的最小示例(生产者 --> kafka --> 消费者)之后,我不太清楚“消费者状态”如何处理偏移量。

一些信息

  1. 我正在使用高级 API (Java)
  2. 我的消费者是一个带有 Main 的简单类,与“快速入门”Kafka 页面上的基本相同
  3. 我正在使用动物园管理员
  4. 我正在使用单个经纪人

现在,文档说 HighLevel API 消费者使用 zookeeper 存储其状态,所以我希望偏移量,因此消费者的状态将保持在

  • Kafka 代理重新启动
  • 消费者重启

但不幸的是,它没有:每次我重新启动代理或消费者时,所有消息都会重新传递。现在,可能这些都是愚蠢的问题,但是

  1. 在 Kafka 重启的情况下:我知道这取决于消费者来保持其状态,所以很可能当代理(重新)启动重新传递所有(!)消息并且消费者决定消费什么时......对吗?如果是这样,如果我有 10.0000.0000 条消息会发生什么?

  2. 如果 JVM 消费者重启:如果状态保持在 Zookeeper 上,为什么要重新传递消息?新的 JVM 是否有可能具有不同的消费者“身份”?而在这种情况下,如何绑定之前的身份呢?

4

3 回答 3

4

是的,consumer 负责保持其状态,Java 高级 Consumer 将其状态保存在 zookeeper 中。

很可能您没有指定groupId配置属性。在这种情况下,kafka 会生成 random groupId

您也可能关闭了autocommit.enable配置属性。

可以在此页面上找到 Kafka 配置的完整参考:http: //kafka.apache.org/configuration.html“高级消费者的重要配置属性”标题下。

于 2013-02-12T19:27:38.340 回答
4

回答原始问题:使用 groupId 有助于避免“从一开始就重新消费所有消息”的情况

如果您更改 groupId,您将从创建队列的那一刻起(或自基于 kafka 日志保留策略的最后一次数据清除以来)获得所有消息

不要将此与 kafka-console-consumer “--from-beginning” 标志(设置 auto.offset.reset 选项)混淆,该标志可在下面的选项 1 和 2 之间进行选择:

1)从消费最后一条消息的那一刻开始消费新消息(不是从最初创建kafka队列的时间开始):

props.put("auto.offset.reset","smallest");

2) 从订阅者 JVM 启动的那一刻起使用新消息(在这种情况下,您可能会在订阅者关闭且未侦听队列时丢失放入队列的消息):

props.put("auto.offset.reset","最大");


旁注:以下仅与原始问题相关

对于更高级的用例 - 如果您尝试以编程方式设置消费者偏移量以从特定时间开始重播消息 - 它需要使用 SimpleConsumer API,如https://cwiki.apache.org/confluence/display/KAFKA/中所示0.8.0+SimpleConsumer+Example以找到从正确的代理/分区重放的最小偏移量。这实质上是用我们自己的 FindLeader 逻辑替换了 zookeeper。非常棘手。

对于这个用例(从某个用户指定的时间开始临时重播消息),我们决定存储消息的本地缓存并在本地管理偏移量,而不是使用 kafka 偏移量管理 API(这将需要重新实现大量 zookeeper 功能与 SimpleConsumer)。

即将kafka视为“邮递员”,一旦消息被传递,它就会进入本地邮箱,以防我们需要回到过去的某个偏移量,例如重播消息(已经被消费),例如如果发生消费者应用程序错误,我们不会回到“邮局”(kafka 经纪人)来找出正确的交付顺序,而是在本地进行管理。

旁注结束

于 2013-12-11T19:28:37.507 回答
3

看来我一直是个糟糕的读者……这一切都在配置页面中。具体来说,我的两个问题都是通过设置一个默认为“最小”的标志“autooffset.reset”来解决的,因此会导致所描述的效果。

现在,以“最大”为值,在消费者和代理重新启动的情况下,事情都按预期工作,因为偏移量始终是最大的。

于 2013-02-13T09:13:43.317 回答