感谢 Gary Russell 的快速回复。createRawConsumer 有效地为事务性和非事务性消费者调用。
Sleuth 使用 TraceConsumerPostProcessor 将 Kafka 消费者包装到 TracingConsumer 中。由于 ProducerPostProcessor 接口扩展了 Function 接口,我们可以假设可以/应该使用函数的结果,但 DefaultKafkaProducerFactory 的 createRawConsumer 方法正在应用后处理器而不使用返回类型。在这种特定情况下导致问题。
那么,我们不能修改 createRawConsumer 的实现来分配后处理器的结果吗?如果没有,让后处理器扩展消费者而不是函数不是更好吗?
通过覆盖 createRawConsumer 方法进行的成功测试如下
@Override
protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
Producer<K, V> kafkaProducer = new KafkaProducer<>(rawConfigs, getKeySerializerSupplier().get(), getValueSerializerSupplier().get());
for (ProducerPostProcessor<K, V> pp : getPostProcessors()) {
kafkaProducer = pp.apply(kafkaProducer);
}
return kafkaProducer;
}
谢谢您的帮助。