在 Spring Integration 中使用 webflux gateway Java DSL 时,我遇到了失败的回复。它仅适用于前几个请求(具体为<8),之后我收到回复错误:
org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:741) ~[spring-integration-jms-5.3.2.RELEASE.jar:5.3.2.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
当我.fluxTransform(f -> f)
在入站网关上使用 或使用非反应式http 出站网关时,即使在具有数千个请求的 jmeter 基准测试中,我也不会收到错误消息。
- 为什么我必须调用
fluxTransform(f -> f)
第一个流程才能使其工作? - 为什么
fluxTransform(f -> f)
我Http.outboundGateway
在第二个流程中使用时它没有工作?
场景
我已经使用四个网关创建了一个路由,用于在远程机器上发出 Web 请求的相当复杂的设置,但我
整合流程一:
入站 webflux 网关 -> 出站 jms 网关
@Bean
public IntegrationFlow step1() {
// request-reply pattern using the jms outbound gateway
var gateway = Jms.outboundGateway(jmsConnectionFactory)
.requestDestination("inboundWebfluxQueue")
.replyDestination("outboundWebfluxQueue")
.correlationKey("JMSCorrelationID");
// send a request to jms, wait for the reply and return message payload as response
return IntegrationFlows.from(webfluxServer("/example/webflux"))
// won't work consistently without the next line
.fluxTransform(f -> f)
.handle(gateway).get();
}
集成流程 2:
入站 jms 网关 -> 出站 webflux 网关
@Bean
public IntegrationFlow step2_using_webflux() {
var gateway = WebFlux.outboundGateway("http://localhost:8080/actuator/health")
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class)
// ignore headers
.mappedResponseHeaders();
return IntegrationFlows.from(jmsInboundGateway())
// use webflux outbound gateway to send the request to the TEST_URL
.handle(gateway).get();
}
完整的路线如下所示:
客户端 Web 请求 -> 流 1 ->(消息代理)-> 流 2 -> 服务器 Web 请求