7

我正在使用 KafkaMessageListenerContainer 从 kafka 主题中消费,我有一个应用程序逻辑来处理每个记录,这也依赖于其他微服务。我现在在处理每条记录后手动提交偏移量。

但是,如果我的应用程序逻辑失败,我需要寻找失败的偏移量并继续处理它,直到它成功。为此,我需要对最后一个偏移量进行运行时手动查找。

KafkaMessageListenerContainer 是否可以做到这一点?

4

2 回答 2

9

寻求特定的偏移量

为了寻找,你的监听ConsumerSeekAware器必须实现以下方法:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

第一个在容器启动时调用;在初始化后的某个任意时间进行搜索时,应使用此回调。您应该保存对回调的引用;如果您在多个容器(或 a ConcurrentMessageListenerContainer)中使用相同的侦听器,则应将回调存储在ThreadLocal由侦听器线程键入的 a 或其他一些结构中。

于 2017-03-24T12:45:16.297 回答
0

请参考 Spring Kafka 文档

“对于现有的组 ID,初始偏移量是该组 ID 的当前偏移量”。

并且 Confluent 还提到:“如果消费者崩溃或被关闭,它的分区将被重新分配给另一个成员,该成员将从每个分区的最后一个提交的偏移量开始消费”

这意味着,在您的情况下,当消费者恢复时,它将在最后提交的偏移量处继续。如果您手动提交,您也不需要“寻找失败的偏移量”

ENABLE_AUTO_COMMIT_CONFIG = 假

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)

于 2020-07-03T04:10:53.443 回答