我正在使用 Spring 集成 5.0.6。我已经完成了它的文档并创建了以下代码,该代码在 HTTP 端点上侦听并发布到 kafka 主题。
一切正常,我也在主题上收到消息。但是在 HTTP 客户端没有发送回复,它给出“在超时内没有收到回复”。
如何在以下代码中向 http 调用者发送回复:
@Bean
public DirectChannel replyChannel() {
return new DirectChannel();
}
@Bean(name = "restInputFlow")
public IntegrationFlow send() {
return IntegrationFlows
.from(Http.inboundGateway("/push").requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class).replyChannel(replyChannel()))
.transform(new Transformer())
.handle(kafkaMessageHandler(producerFactory(), getKafkaSourceTopic()))
.enrichHeaders(
c -> c.header(org.springframework.integration.http.HttpHeaders.STATUS_CODE, HttpStatus.CREATED))
.get();
}
private KafkaProducerMessageHandlerSpec<GenericRecord, GenericRecord, ?> kafkaMessageHandler(
ProducerFactory<GenericRecord, GenericRecord> producerFactory, String topic) {
return Kafka.outboundChannelAdapter(producerFactory)
.messageKey("key").headerMapper(mapper())
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
谢谢你的帮助。