我想知道在 spring kafkaAckMode
中设置为时提交是如何工作的。MANUAL
下面是我在KafkaConfig
containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);中设置的属性
listener
代码_
@KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup")
public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) {
countDownLatch.countDown();
acknowledgment.acknowledge();
}
我正在acknowledgement
按照 spring kafka 文档进行操作,但这仅意味着我的消息被标记为已发送但未使用(这是我的理解)。
在那种情况下,我应该调用该
commitSync()
方法吗?如果是,我从哪里调用它,因为我需要获取对KafkaConsumer
. 如果否,它在内部是如何工作的,我可以跟踪它吗?是否有
commitId
返回值?我的想法是知道是否消费了特定的消费者记录。我想存储该值以用于内部跟踪目的。- kafka 是否在内部维护消费者记录上的任何状态,例如(已确认、已提交、未提交),这有助于分类。
这真的可以帮助我区分有多少记录被消耗,有多少正在等待处理以及它们的状态。