1

目前,我们正在使用事务性 Kafka 生产者。我们注意到的是缺少 Kafka 的跟踪方面,这意味着我们看不到 Kafka 生产者的检测,因此缺少 b3 标头。

浏览代码后,我们发现没有为事务生产者调用后处理器,这意味着 TraceProducer 永远不会由 TraceProducerPostProcessor 创建。这有什么原因吗?此外,为事务生产者启用跟踪的解决方法是什么?似乎没有一个地方可以轻松创建跟踪生产者(DefaultKafkaProducerFactory #doCreateTxProducer 是私有的)

附加屏幕截图(DefaultKafkaProducerFactory 类)。在屏幕截图中,您可以看到仅为原始生产者调用后处理器,而不是为事务生产者调用。

您的帮助将不胜感激。

谢谢

DefaultKafkaProducerFactory#createRawProducer

4

2 回答 2

1

??

createRawProducer()对事务性和非事务性生产者都调用:

在此处输入图像描述

其他事情正在发生。

编辑

问题是侦探用不同的生产者替换了生产者,但工厂丢弃了它并使用了原件。

https://github.com/spring-projects/spring-kafka/issues/1778

编辑2

实际上,我们在这里丢弃跟踪生产者是一件好事;Sleuth 还将工厂包装在代理中,并将工厂包装CloseSafeProducerTracingProducer; 但我看到交易和非交易生产者的结果相同......

@SpringBootApplication
public class So67194702Application {

    public static void main(String[] args) {
        SpringApplication.run(So67194702Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(ProducerFactory<String, String> pf) {
        return args -> {
            Producer<String, String> prod = pf.createProducer();
            prod.close();
        };
    }

}

close()在...上设置断点

在此处输入图像描述

于 2021-04-21T12:52:52.577 回答
0

感谢 Gary Russell 的快速回复。createRawConsumer 有效地为事务性和非事务性消费者调用。

Sleuth 使用 TraceConsumerPostProcessor 将 Kafka 消费者包装到 TracingConsumer 中。由于 ProducerPostProcessor 接口扩展了 Function 接口,我们可以假设可以/应该使用函数的结果,但 DefaultKafkaProducerFactory 的 createRawConsumer 方法正在应用后处理器而不使用返回类型。在这种特定情况下导致问题。

那么,我们不能修改 createRawConsumer 的实现来分配后处理器的结果吗?如果没有,让后处理器扩展消费者而不是函数不是更好吗?

通过覆盖 createRawConsumer 方法进行的成功测试如下

    @Override
    protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
        Producer<K, V> kafkaProducer = new KafkaProducer<>(rawConfigs, getKeySerializerSupplier().get(), getValueSerializerSupplier().get());
        for (ProducerPostProcessor<K, V> pp : getPostProcessors()) {
            kafkaProducer = pp.apply(kafkaProducer);
        }
        return kafkaProducer;
    }

谢谢您的帮助。

于 2021-04-21T13:30:02.570 回答