我正在使用 向 Kafka 发送消息,ReplyingKafkaTemplate
并且它正在使用kafka_correlationId
. 但是,当它点击我的@KafkaListener
方法并将其转发到回复主题时,标题会丢失。
如何保留 kafka 标头?
这是我的方法签名:
@KafkaListener(topics = "input")
@SendTo("reply")
public List<CustomOutput> consume(List<CustomInput> inputs) {
... /* some processing */
return outputs;
}
我已经创建了一个ProducerInterceptor
,所以我可以看到从ReplyingKafkaTemplate
以及@SendTo
注释中发送了哪些标头。从那开始,另一个奇怪的事情是ReplyingKafkaTemplate
没有将记录的kafka_replyTopic
标题添加到消息中。
以下ReplyingKafkaTemplate
是配置的方式:
@Bean
public KafkaMessageListenerContainer<Object, Object> replyContainer(ConsumerFactory<Object, Object> cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
@Bean
public ReplyingKafkaTemplate<Object, Object, Object> replyingKafkaTemplate(ProducerFactory<Object, Object> pf, KafkaMessageListenerContainer<Object, Object> container) {
return new ReplyingKafkaTemplate<>(pf, container);
}
我不确定这是否相关,但我也添加了Spring Cloud Sleuth作为依赖项,并且当我发送消息时 span/trace 标头在那里,但是在转发消息时会生成新的标头。