我正在使用 KafkaMessageListenerContainer 从 kafka 主题中消费,我有一个应用程序逻辑来处理每个记录,这也依赖于其他微服务。我现在在处理每条记录后手动提交偏移量。
但是,如果我的应用程序逻辑失败,我需要寻找失败的偏移量并继续处理它,直到它成功。为此,我需要对最后一个偏移量进行运行时手动查找。
KafkaMessageListenerContainer 是否可以做到这一点?
我正在使用 KafkaMessageListenerContainer 从 kafka 主题中消费,我有一个应用程序逻辑来处理每个记录,这也依赖于其他微服务。我现在在处理每条记录后手动提交偏移量。
但是,如果我的应用程序逻辑失败,我需要寻找失败的偏移量并继续处理它,直到它成功。为此,我需要对最后一个偏移量进行运行时手动查找。
KafkaMessageListenerContainer 是否可以做到这一点?
寻求特定的偏移量。
为了寻找,你的监听
ConsumerSeekAware
器必须实现以下方法:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
第一个在容器启动时调用;在初始化后的某个任意时间进行搜索时,应使用此回调。您应该保存对回调的引用;如果您在多个容器(或 a
ConcurrentMessageListenerContainer
)中使用相同的侦听器,则应将回调存储在ThreadLocal
由侦听器线程键入的 a 或其他一些结构中。
“对于现有的组 ID,初始偏移量是该组 ID 的当前偏移量”。
并且 Confluent 还提到:“如果消费者崩溃或被关闭,它的分区将被重新分配给另一个成员,该成员将从每个分区的最后一个提交的偏移量开始消费”
这意味着,在您的情况下,当消费者恢复时,它将在最后提交的偏移量处继续。如果您手动提交,您也不需要“寻找失败的偏移量”
ENABLE_AUTO_COMMIT_CONFIG = 假
和
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)