2

在 Confluent Cloud 上使用 Kafka 时,我发现了一个奇怪的行为。我创建了一个具有默认分区值的主题:6。

我的系统由一个向该主题发送消息的 Java Producer 应用程序和一个从中读取并执行每个消息操作的 Kafka Streams 应用程序组成。

-----------------------          --------            -----------
| Kafka Java Producer |  ---->  | topic | ---->      | KStream |
-----------------------          --------            -----------

目前我只启动 Kafka Streams 应用程序的一个实例,因此消费者组有一个成员。

这是我观察到的:

  1. 生产者发送一条消息,并将其记录在偏移量为 0的事件主题中:

在此处输入图像描述

  1. 消息到达 KStream,正在正确处理,正如我在 KStream 日志跟踪中看到的那样:

韩流

events.foreach { key, value ->
    logger.info("--------> Processing TimeMetric {}", value)
    //Store in DB

日志

[-StreamThread-1] uration$$EnhancerBySpringCGLIB$$e72e3f00 : --------> Processing Event {"...

  1. 在 Confluent Cloud 消费者滞后中,我可以看到所有消费者组及其状态。KStream 有一个名为events-processor-19549050-d8b0-4b39.... 如前所述,该组只有一个成员(KStream 的唯一实例)。但是,如果显示该组在分区 2 中的一条消息后面。此外,请注意当前偏移量似乎为 1,结束偏移量为 2):

在此处输入图像描述

  1. 如果我在生产者中发送另一条消息,它会再次记录在主题中,但这次使用偏移量 2 而不是 1

在此处输入图像描述

  1. 消息到达KStream,再次正常处理:

[-StreamThread-1] uration$$EnhancerBySpringCGLIB$$e72e3f00 : --------> Processing Event {

  1. 回到消费者组的消费者滞后,它仍然落后一条消息,仍然有一些奇怪的偏移量(当前 3,结束 4):

在此处输入图像描述

尽管处理似乎很好,但上面显示的状态并没有多大意义。能否解释一下原因:

  1. 消息偏移量增加 +2 而不是 +1?
  2. 即使正确处理了消息,消费者组似乎也落后了 1 条消息?
4

1 回答 1

2

对于第一个问题,有两种可能性(尽管通过阅读第二个问题,您似乎正在使用事务):

  • 如果您没有使用一次性语义,则生产者可能会发送多个消息,因为之前发送的内容无法控制。这样,由于那些重复的消息,Kafka 的默认at-least-once语义可能会增加您的偏移量 >+1。

  • 如果您使用一次性语义或事务,则事务的每个事件都会将标记写入主题,以用于内部控制目的。这些标记是 +2 增加的原因,因为它们也存储在主题中(但被消费者避免)。Confluent 的交易指南也有一些关于这种行为的信息:

    在生产者发起提交(或中止)后,协调者开始两阶段提交协议。

    在第一阶段,协调器将其内部状态更新为“prepare_commit”,并在事务日志中更新此状态。一旦完成,无论如何都保证事务被提交。

    然后协调器开始第 2 阶段,在该阶段它将事务提交标记写入作为事务一部分的主题分区。

    这些事务标记不暴露给应用程序,但消费者在 read_committed 模式下使用它来过滤来自中止事务的消息,并且不返回属于打开事务的一部分的消息(即那些在日志中但没有与它们关联的事务标记)。

    一旦标记被写入,事务协调器将事务标记为“完成”,生产者可以开始下一个事务。

一般来说,您不应该关心偏移量,因为它不是查看的权威指南。例如,重试、重复或事务标记会使偏移量与您对生产者的期望不同,但您不必担心;您的消费者会,而且他们只会处理“真实”的消息。

关于问题 2,这是一个已知问题:https ://issues.apache.org/jira/browse/KAFKA-6607

引用jira:

当使用事务编写 Kafka Streams 应用程序的输入主题时,如果到达主题末尾,Kafka Streams 不会提交“endOffset”而是“endOffset - 1”。原因是提交标记是主题中的最后一个“消息”;流提交“最后处理的消息的偏移量加 1”并且不考虑提交标记。

这不是正确性问题,但是当通过 bin/kafka-consumer.group.sh 检查消费者滞后时,滞后显示为 1 而不是 0——从消费者组工具的角度来看是正确的。

希望能帮助到你!

于 2019-07-19T08:50:30.327 回答