背景 :
将消费者拦截器设置为 StreamsConfig 将确保在使用/提交消息时调用拦截器。片段来自org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsSync
if (future.succeeded()) {
if (interceptors != null)
interceptors.onCommit(offsets);
return true;
}
但是consumerInterceptor.onCommit()
即使我看到在源主题中提交了偏移量,也从未调用过。
问题:
我认为这是因为我使用的是启用了 Exactly once 处理保证的 kstreams。
这是当时的逻辑org.apache.kafka.streams.processor.internals.StreamTask#commit
if (this.eosEnabled) {
this.producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, this.applicationId);
this.producer.commitTransaction();
if (startNewTransaction) {
this.producer.beginTransaction();
}
} else {
this.consumer.commitSync(consumedOffsetsAndMetadata);
}
正如你所看到的,consumer.commitSync
which 反过来调用consumerCoordinator.commit
which 调用interceptor.onCommit
, 永远不会被调用,因为启用了 eos,它是被调用的事务 api。
问题: 当在启用 eos 的源主题上提交偏移量时,有没有办法可以将回调挂钩到 kstream?