0

假设Kafka, 1 partition, 2 consumers.(第二个消费者空闲)

假设第一个消费了一条消息,然后用其他 3 个服务处理它,突然粘在其中一个上,错过了 Kafka 的超时。

卡夫卡是否会将分区重新分配给第二个消费者并且消息将被双重处理(假设第一个最终成功)?

4

1 回答 1

1

如果 Kafka 的消费者处理消息的时间过长怎么办?Kafka 是否会将此分区重新指定给另一个消费者,并且消息会被双重处理?

对,那是正确的。如果 Kafka 消费者处理消息的时间过长,并且后续 poll() 被延迟,则 Kafka 将重新将该分区分配给另一个消费者,并且将再次(一次又一次)处理该消息。

为了更清楚,首先我们需要决定并定义“多长是太长?”。

这是由属性定义的max.poll.interval.ms。从文档中,

使用消费者组管理时调用 poll() 之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设置了上限。如果在此超时到期之前未调用 poll(),则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个成员。

如果在这段时间内没有调用 poll(),消费者组将重新平衡。

还有一个属性auto.commit.interval.ms。仅在轮询期间将调用自动提交偏移量检查 - 它检查经过的时间是否大于配置的自动提交间隔时间,如果结果为是,则提交偏移量。

如果 Kafka 消费者处理记录的时间过长,则后续 poll() 调用也会延迟,并且不会提交最后一次 poll() 返回的偏移量。如果此时发生再平衡,分配给该分区的新消费者客户端将重新开始处理消息。

通过增加此值可以避免消费者组重新平衡和由此产生的分区重新分配。这将增加轮询之间的允许间隔,并为消费者提供更多时间来处理从 poll() 返回的记录。消费者只会在轮询调用中加入重新平衡,因此增加最大轮询间隔也会延迟组重新平衡。

将最大轮询间隔增加到一个大值还有一个问题。如果消费者因其他原因死亡,则max.poll.interval.ms检测故障所需的时间比配置的间隔要长。

session.timeout.ms并且heartbeat.interval.ms在这种情况下可以尽可能早地检测到完全故障。

有关这些参数的更多详细信息:

请注意,配置的值session.timeout.ms必须在代理配置中通过属性配置的允许范围内

  • group.min.session.timeout.ms
  • group.max.session.timeout.ms

否则,启动消费者客户端时将引发以下异常。

Exception in thread "main" org.apache.kafka.common.errors.InvalidSessionTimeoutException:
The session timeout is not within the range allowed by the broker
(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)

更新:为了避免再次处理消息

KafkaConsumer 类中还有另一个方法commitAsync()可以触发提交偏移操作。

ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
kafkaConsumer.commitAsync();

有关 commitSync() 和 commitAsync() 的更多详细信息,请查看此线程

手动提交偏移量是表示该偏移量已被处理,因此 Kafka 不会再次发送同一分区的已提交记录。当手动提交偏移量时,请务必注意,如果消费者在处理记录之前因任何原因死亡,则这些记录有可能不会再次被处理。

于 2021-11-24T04:45:33.480 回答