0

我正在使用 向 Kafka 发送消息,ReplyingKafkaTemplate并且它正在使用kafka_correlationId. 但是,当它点击我的@KafkaListener方法并将其转发到回复主题时,标题会丢失。

如何保留 kafka 标头?

这是我的方法签名:

@KafkaListener(topics = "input")
@SendTo("reply")
public List<CustomOutput> consume(List<CustomInput> inputs) {
  ... /* some processing */
  return outputs;
}

我已经创建了一个ProducerInterceptor,所以我可以看到从ReplyingKafkaTemplate以及@SendTo注释中发送了哪些标头。从那开始,另一个奇怪的事情是ReplyingKafkaTemplate没有将记录的kafka_replyTopic标题添加到消息中。

以下ReplyingKafkaTemplate是配置的方式:

@Bean
public KafkaMessageListenerContainer<Object, Object> replyContainer(ConsumerFactory<Object, Object> cf) {
  ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
  return new KafkaMessageListenerContainer<>(cf, containerProperties);
}

@Bean
public ReplyingKafkaTemplate<Object, Object, Object> replyingKafkaTemplate(ProducerFactory<Object, Object> pf, KafkaMessageListenerContainer<Object, Object> container) {
  return new ReplyingKafkaTemplate<>(pf, container);
}

我不确定这是否相关,但我也添加了Spring Cloud Sleuth作为依赖项,并且当我发送消息时 span/trace 标头在那里,但是在转发消息时会生成新的标头。

4

1 回答 1

1

默认情况下,请求消息中的任意标头不会复制到回复消息中,只有kafka_correlationId.

从 2.2 版开始,您可以配置ReplyHeadersConfigurer调用哪个来确定应复制哪些标头。

请参阅文档

从版本 2.2 开始,您可以ReplyHeadersConfigurer向侦听器容器工厂添加一个。参考此信息以确定您要在回复消息中设置哪些标头。

编辑

顺便说一句,在 2.2 中,如果没有标头,RKT 会自动设置回复。

使用 2.1.x 是可以做到的,但它有点牵涉,你必须自己做一些工作。关键是接收和回复Message<?>...

@KafkaListener(id = "so55622224", topics = "so55622224")
@SendTo("dummy.we.use.the.header.instead")
public Message<?> listen(Message<String> in) {
    System.out.println(in);
    Headers nativeHeaders = in.getHeaders().get(KafkaHeaders.NATIVE_HEADERS, Headers.class);
    byte[] replyTo = nativeHeaders.lastHeader(KafkaHeaders.REPLY_TOPIC).value();
    byte[] correlation = nativeHeaders.lastHeader(KafkaHeaders.CORRELATION_ID).value();
    return MessageBuilder.withPayload(in.getPayload().toUpperCase())
            .setHeader("myHeader", nativeHeaders.lastHeader("myHeader").value())
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .build();
}

// This is used to send the reply - needs a header mapper
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
    KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
    MessagingMessageConverter messageConverter = new MessagingMessageConverter();
    messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); // map all byte[] headers
    kafkaTemplate.setMessageConverter(messageConverter);
    return kafkaTemplate;
}

@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
    return args -> {
        Headers headers = new RecordHeaders();
        headers.add(new RecordHeader("myHeader", "myHeaderValue".getBytes()));
        headers.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "so55622224.replies".getBytes())); // automatic in 2.2
        ProducerRecord<String, String> record = new ProducerRecord<>("so55622224", null, null, "foo", headers);
        RequestReplyFuture<String, String, String> future = template.sendAndReceive(record);
        ConsumerRecord<String, String> reply = future.get();
        System.out.println("Reply: " + reply.value() + " myHeader="
                + new String(reply.headers().lastHeader("myHeader").value()));
    };
}
于 2019-04-11T02:17:30.737 回答