2

我想知道在 spring kafkaAckMode中设置为时提交是如何工作的。MANUAL

下面是我在KafkaConfig containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);中设置的属性

listener代码_

@KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup")
    public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) {
        countDownLatch.countDown();     
        acknowledgment.acknowledge();
}

我正在acknowledgement按照 spring kafka 文档进行操作,但这仅意味着我的消息被标记为已发送但未使用(这是我的理解)。

  1. 在那种情况下,我应该调用该commitSync()方法吗?如果,我从哪里调用它,因为我需要获取对KafkaConsumer. 如果否,它在内部是如何工作的,我可以跟踪它吗?

  2. 是否有commitId返回值?我的想法是知道是否消费了特定的消费者记录。我想存储该值以用于内部跟踪目的。

  3. kafka 是否在内部维护消费者记录上的任何状态,例如(已确认、已提交未提交),这有助于分类。

这真的可以帮助我区分有多少记录被消耗,有多少正在等待处理以及它们的状态。

4

1 回答 1

2

我可以回答第一个问题。其余的一切看起来都像是 Apache Kafka 的直接故事。

由于我们不能commit从我们想要的地方执行,而只能从执行的同一线程执行consumer.poll(),我们将所有提交请求存储在内部KafkaMessageListenerContainer队列中,并在执行之前查看主消费者循环中的内容this.consumer.poll()

即使您使用MANUAL_IMMEDIATE,实际consumer.commitSync()是在与您的不同线程上执行的acknowledgment.acknowledge()

OTOH 将 API 交给那里,Consumer如下所示:

public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);

所以,没有任何commitId钩子可以解决。

Not Committed我认为在 Apache Kafka 之类的或其他任何东西中都没有这样的概念。数据始终存在于主题日志中,并且在特定的管理操作或压缩配置之前不会从那里删除。

我认为该commit offset功能与consumer group目的完全相关,并且根据我们拥有的 JavaDocs:

* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used. The committed offset should be the next message your application will consume,
* i.e. lastProcessedMessageOffset + 1.

因此,当您的消费者死亡时,它将从其组的最后提交的偏移量重新启动。不同的组可能会读取相同的数据,但来自其他偏移量。我认为这绝对是为什么他们的 API 没有提供任何挂钩到实际状态的原因。没有这样的!

于 2017-02-23T18:28:11.840 回答