0

背景 :

将消费者拦截器设置为 StreamsConfig 将确保在使用/提交消息时调用拦截器。片段来自org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsSync

       if (future.succeeded()) {
            if (interceptors != null)
                interceptors.onCommit(offsets);
            return true;
        }

但是consumerInterceptor.onCommit()即使我看到在源主题中提交了偏移量,也从未调用过。

问题:

我认为这是因为我使用的是启用了 Exactly once 处理保证的 kstreams。

这是当时的逻辑org.apache.kafka.streams.processor.internals.StreamTask#commit

        if (this.eosEnabled) {
            this.producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, this.applicationId);
            this.producer.commitTransaction();
            if (startNewTransaction) {
                this.producer.beginTransaction();
            }
        } else {
            this.consumer.commitSync(consumedOffsetsAndMetadata);
        }

正如你所看到的,consumer.commitSyncwhich 反过来调用consumerCoordinator.commitwhich 调用interceptor.onCommit, 永远不会被调用,因为启用了 eos,它是被调用的事务 api。

问题: 当在启用 eos 的源主题上提交偏移量时,有没有办法可以将回调挂钩到 kstream?

4

0 回答 0