0

我正在使用 Kafka:2.11-1.0.1。应用程序包含主题“X”的并发数=5 的消费者,分区数=5。

当应用程序重新启动并且消息在分区分配之前发布到主题'X'上时,主题'X'的5个消费者找到组协调器并向组协调器发送加入组请求。预计会从组协调员那里得到响应,但没有收到响应。

我检查了 Kafka 服务器日志,但找不到相关日志,发现 DEBUG 日志级别。

当我运行描述消费者组命令时,会进行以下观察:

  1. 消费群体正在重新平衡
  2. 有一定滞后的老消费者
  3. 具有一些随机名称的新消费者。随着时间的推移,新的消费者数量正在增加。

新消息发布在主题“X”上,但消费者没有收到。

heartbeat 和 session.time.out 设置为默认值。

如果在为主题“X”及其使用者分配分区之前发布消息,则会出现此问题。

我的疑问是:为什么重新平衡没有完成,以至于新的消费者开始消费新产生的消息?

4

1 回答 1

0

应用程序在消费者组中有以下消费者

  • 消费者 A 收听 Topic1。Topic1 有 1 个分区。此消费者的 max.poll.interval.time.ms=4 小时。
  • 消费者 B 收听 Topic2。Topic2 有 5 个分区。消费者 B 并发=5。此消费者的 max.poll.interval.time.ms=1 小时。

应用程序重新启动时发生的情况以及主题之一是否已发布消息

  • 当应用程序重新启动创建的消费者 (consumerA1) 的一个实例并订阅 topic1 时。ConsumerA1 找到组坐标(GC)并发送加入组请求。
  • ConsumerA1 从 GC 获得响应并成为领导者。直到这一步,其他消费者还没有初始化。
  • ConsumerA1 分配分区并向 GC 发送 SyncGroup 请求。新的任务生成发生。这样第一次rebalance就完成了。
  • topic1 上的消息已发布,consumerA1 获取此消息并开始处理。处理完成此消息需要大量时间。(说 2 小时)
  • 现在5个consumer实例一个一个初始化,都订阅了topic2。这些消费者找到 GC 并发送加入组请求。但 GC 没有回应他们。
  • 当 consumerA1 向 GC 发送心跳时,GC 会响应重新平衡正在进行但 consumerA1 不会撤销分区,因为它正在处理消息。
  • 根据再平衡协议(关于再平衡的好文章),GC 等待所有消费者发送加入组请求。在这种情况下,GC 等待从 consumerA1 获取加入组请求。最长等待时间为 max.poll.interval.time.ms,即在这种情况下为 4 小时。

根本原因:

组协调器在应用程序重启后没有等待所有消费者初始化,因此首先发生了不必要的重新平衡,因此消费者A1从分区获取消息并开始处理它。

解决方案: 为了避免这种不必要的初始重新平衡,kafka 提供了一种配置,其中 Group Coordinator 等待消费者加入新的消费者组。文档

group.initial.rebalance.delay.ms

检查了我的 kafka server.properties ,它被设置为 0。尝试使用默认值,即 3 秒。避免了初始重新平衡,GC 在应用程序重新启动时等待 3 秒,此时所有其他消费者都已初始化。所有消费者都发送了加入组请求,因为所有 GC 都收到了来自所有消费者的请求。GC 立即响应,重新平衡已成功进行并完成。

于 2020-07-06T10:06:00.710 回答