74

我是 Kafka 的新用户,现在已经试用了大约 2-3 周。我相信目前我对 Kafka 在很大程度上是如何工作的有很好的了解,但是在尝试为我自己的 Kafka 消费者安装 API 之后(这很模糊,但我正在遵循应该的新 KafkaConsumer 的指导方针可用于 v 0.9,它在“主干”回购 atm 上)如果我有多个具有相同 groupID 的消费者,我会遇到从一个主题消耗的延迟问题。

在此设置中,我的控制台始终记录有关“重新平衡触发”的问题。当我将新的消费者添加到消费者组时是否会发生重新平衡,并且是否会触发它们以找出同一 groupID 中的哪个消费者实例将获得哪些分区或重新平衡完全用于其他用途?

我还从https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design看到了这段话,我似乎无法理解它,所以如果有人可以帮我制作不胜感激的感觉:

重新平衡是一组消费者实例(属于同一组)协调以拥有该组订阅的主题分区的互斥集的过程。在消费者组的成功重新平衡操作结束时,所有订阅主题的每个分区都将由组内的单个消费者实例拥有。再平衡的工作方式如下。每个代理都被选为消费者组子集的协调者。组的协调者代理负责编排针对订阅主题的消费者组成员资格更改或分区更改的重新平衡操作。它还负责将生成的分区所有权配置传达给正在进行重新平衡操作的组的所有使用者。

4

4 回答 4

69

当一个新的消费者加入一个消费者组时,一组消费者会尝试“重新平衡”负载以将分区分配给每个消费者。如果在此分配发生时消费者集发生变化,重新平衡将失败并重试。此设置控制放弃前的最大尝试次数。

执行此操作的命令是:rebalance.max.retries,默认设置为 4。

此外,如果以下情况属实,则可能会发生这种情况:

ZooKeeper 会话超时。如果消费者在这段时间内未能向 ZooKeeper 发送心跳,则认为它已死亡,并且将发生重新平衡。

希望这可以帮助!

于 2015-06-22T19:21:45.000 回答
53

重新平衡是在给定消费者组内的消费者之间重新分配分区所有权。请记住,消费者组中的每个消费者都被专门分配了一个或多个主题分区。

再平衡发生在:

  • 消费者加入群组
  • 消费者彻底关闭
  • 组协调员认为消费者已死亡。这可能发生在崩溃之后或消费者忙于长时间运行的处理时,这意味着消费者在配置的会话间隔内同时没有向组协调器发送心跳
  • 添加了新分区

作为为消费者组指定的组协调者(集群中的代理之一)和组长(加入组的第一个消费者), Rebalance可以或多或少地描述如下:

  • 领导者从 组协调器接收组中所有消费者的列表(这将包括最近发送心跳并因此被认为是活动的所有消费者)并负责为每个消费者分配一个分区子集。
  • 在决定分区分配后(Kafka 有几个内置的分区分配策略),组长将分配列表发送给组协调器,组协调器将此信息发送给所有消费者。

这适用于 Kafka 0.9,但我很确定新版本仍然有效。

于 2017-10-16T15:42:58.843 回答
12

消费者重新平衡决定哪个消费者负责某些主题的所有可用分区的哪个子集。例如,您可能有一个具有 20 个分区和 10 个消费者的主题;在重新平衡结束时,您可能希望每个消费者从 2 个分区中读取数据。如果您关闭其中的 10 个消费者,您可能希望每个消费者在重新平衡完成后拥有 1 个分区。消费者重新平衡是一种动态分区分配,可以由 Kafka 自动处理。

Group Coordinator是负责与消费者通信以实现消费者之间重新平衡的代理之一。在早期版本中,Zookeeper 存储元数据详细信息,但最新版本存储在代理上。消费者协调器接收来自消费者组的所有消费者的心跳和轮询,因此请注意每个消费者的心跳并管理他们在分区上的偏移量。

组长: 一个消费者组作为组长,由组协调员选择,负责代表组中的所有消费者做出分区分配决策。

再平衡场景:

  1. 消费者组订阅任何主题

  2. 消费者实例无法发送带有 session.heart.beat 时间间隔的心跳。

  3. 消费者长进程超过轮询超时

  4. 消费者组的消费者通过异常

  5. 添加了新分区。

  6. 扩大和缩小消费者。手动添加新消费者或删除现有消费者

消费者再平衡

当消费者请求加入或离开群组时发起消费者重新平衡。Group Leader 从 Group Coordinator 收到所有活跃消费者的列表。Group Leader 使用 PartitionAssigner 决定分配给每个消费者的分区。一旦 Group Leader 完成分区分配,它会将分配列表发送给 Group Coordinator,Group Coordinator 会将这些信息发回给所有消费者。组仅将适用的分区发送给他们的消费者,而不是其他消费者分配的分区。只有 Group Leader 知道所有消费者及其分配的分区。重新平衡完成后,消费者开始向 Group Coordinator 发送 Heartbeat,表明它还活着。消费者向 Group Coordinator 发送一个 OffsetFetch 请求,以获取其分配的分区的最后提交的偏移量。

状态管理

在重新平衡时,Group coordinator 将其状态设置为 Rebalance 并等待所有消费者重新加入该组。

当组开始重新平衡时,组协调器首先将其状态切换为重新平衡,以便通知所有交互的消费者重新加入组。一旦重新平衡完成,组协调器创建新的生成 ID 并通知所有消费者和组继续同步阶段,消费者发送同步请求并继续等待组领导完成生成新的分配分区。一旦消费者收到一个新的分配分区,他们就会进入一个稳定的阶段。

在此处输入图像描述

静态成员

这种重新平衡是一项相当繁重的操作,因为它需要停止所有消费者并等待获取新分配的分区。在每次重新平衡时,总是创建新的一代 id 意味着刷新一切。为了解决这个开销,Kafka 2.3+ 引入了静态成员来减少不必要的重新平衡。KIP-345

在静态成员中,消费者状态将持续存在,并且在重新平衡时,相同的分配将得到应用。它使用新的 group.instance.id 来保留成员身份。因此,即使在最坏的情况下,成员 id 也会重新洗牌以分配新分区,但相同的消费者实例 ID 仍将获得相同的分区分配

instanceId: A, memberId: 1, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}

重启后:

instanceId: A, memberId: 4, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}   

参考:

  1. https://www.confluent.io/blog/kafka-rebalance-protocol-static-membership

  2. https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances

于 2019-11-03T05:35:21.387 回答
0

Consumer Group、Consumer 和 Partition Rebalance Kafka Consumer 可以消费/订阅多个主题并开始接收消息。Kafka 消费者通常是消费者组的一部分。当多个消费者订阅一个主题并属于同一个消费者组时,组中的每个消费者将接收来自主题中不同分区子集的消息。

因此,消费者组中的消费者共享他们订阅的主题中的分区的所有权。当我们向组中添加一个新的消费者时,它开始消费来自之前被另一个消费者消费的分区的消息。当消费者关闭或崩溃时,也会发生同样的事情;它离开了组,它用来消费的分区将被剩余的消费者之一消费。当消费者组正在消费被修改(如添加新分区)时,也会将分区重新分配给消费者。

将分区所有权从一个消费者转移到另一个消费者称为重新平衡”在重新平衡期间,消费者不能消费消息,因此我们可以说重新平衡是整个消费者组不可用的短暂窗口。它还会导致消费者方面的一些其他活动,例如当分区从一个消费者移动到另一个消费者时,cosnumer 会丢失其当前状态,例如如果有任何数据被缓存,那么它需要刷新其缓存,从而减慢整个应用程序的速度,直到消费者被设置。它的状态再次。

heartbeat.interval.ms

消费者维护消费者组中的成员资格,分配给他们的分区的所有权是通过向指定为组协调器的 Kafka 代理发送心跳来实现的,并且对于不同的消费者组来说,它会有所不同。只要消费者定期发送心跳,那么它就被认为是活着的并继续处理来自指定分配分区的消息 当消费者调用 poll 方法(从分区中检索记录)和提交记录时发送心跳消耗。

如果消费者长时间停止发送心跳并且其会话将超时(由session.timeout.ms控制),则组协调器将认为它已死并因此触发重新平衡。如果消费者崩溃并且没有处理消息,则组协调器需要几秒钟没有心跳来确定它已死并触发重新平衡。当一个consumer干净的关闭时,consumer会通知group coordinator它要离开group,coordinator会立即触发rebalance,减少消息不可用的时间。

于 2021-05-27T13:56:58.590 回答