我正在使用带有定义的 IntegrationFlow 的 Spring Integration Java DSL。我看到响应缺少数据片段并且聚合器响应中的correlationId 与调用服务接收到的响应中的值不匹配的行为。
背景:
我在使用随机数据且以每分钟 600 个请求运行的服务器上运行 JMeter 性能测试。在我的笔记本电脑上,我有一个 SoapUI 性能测试正在运行,它击中了同一台服务器。SoapUI 项目以每分钟 60 个请求的速率发送具有相同搜索条件(我们正在匹配)的请求。响应都应包含相同的结果数据。
大约 0.5% 的时间响应返回数据丢失。在这些响应中,从聚合器记录的响应的correlationId 和从调用服务记录的响应的correlationId(在响应返回到调用服务并且已经通过聚合器后记录)不匹配。
知道有什么问题吗?请参阅下面的代码片段。
@Configuration
@EnableAutoConfiguration
@Import(.....AServiceConfig.class)
public class ServiceConfig {
@Bean(name = "inputChannel")
public DirectChannel inputChannel() {
return new DirectChannel();
}
@Bean(name = "outputChannel")
public QueueChannel outputChannel() {
return new QueueChannel();
}
@Bean(name = "transactionLogger")
public ourLogger ourTransactionLogger() {
return OurLoggerFactory.getLogger("ourAppTrx", new ourLoggerConfig(ourTransactionLoggerKey.values()));
}
public IntegrationFlow ourFlow() {
return IntegrationFlows.from(inputChannel())
.split(splitter(ourTransactionLogger()))
.channel(MessageChannels.executor(getExecutor()))
.handle(ourServiceActivator, "service")
.aggregate(t -> t.processor(ourAggregator, AGGREGATE))
.channel(outputChannel())
.get();
}
@Bean(name = "executor")
public Executor getExecutor()
{
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
//snippet from calling service
public InquiryResponse inquire(InquiryRequest request) {
inputChannel.send(MessageBuilder.withPayload(request).build());
Message<?> msgResponse = outputChannel.receive();
InquiryResponse response = (InquiryResponse) msgResponse.getPayload();
TransactionLogger.debug("correlationId + msgResponse.getHeaders().get("correlationId"));
TransactionLogger.debug("InquiryService inquire response = " + response.toString());
return response;
}
//snippet from aggregator
@Aggregator
public <T> InquiryResponse aggregate(List<Message> serviceResponses) {
InquiryResponse response = new InquiryResponse();
serviceResponses.forEach(serviceResponse -> {
Object payload = serviceResponse.getPayload();
if (payload instanceof AMatchResponse) {
response.setA(((AMatchResponse) payload).getA());
} else if (payload instanceof BValueResponse) {
response.setB(((BValueResponse) payload).getB());
} else if (payload instanceof BError) {
response.setB(new B().addBErrorsItem((BError) payload));
} else if (payload instanceof AError) {
response.setA(new A().AError((AError) payload));
} else {
transactionLogger.warn("Unknown message type received. This message will not be aggregated into the response. ||| model=" + payload.getClass().getName());
}
});
transactionLogger.debug("OurAggregator.response = " + response.toString());
return response;
}