1

我正在使用带有定义的 IntegrationFlow 的 Spring Integration Java DSL。我看到响应缺少数据片段并且聚合器响应中的correlationId 与调用服务接收到的响应中的值不匹配的行为。

背景:

我在使用随机数据且以每分钟 600 个请求运行的服务器上运行 JMeter 性能测试。在我的笔记本电脑上,我有一个 SoapUI 性能测试正在运行,它击中了同一台服务器。SoapUI 项目以每分钟 60 个请求的速率发送具有相同搜索条件(我们正在匹配)的请求。响应都应包含相同的结果数据。

大约 0.5% 的时间响应返回数据丢失。在这些响应中,从聚合器记录的响应的correlationId 和从调用服务记录的响应的correlationId(在响应返回到调用服务并且已经通过聚合器后记录)不匹配。

知道有什么问题吗?请参阅下面的代码片段。

@Configuration
@EnableAutoConfiguration
@Import(.....AServiceConfig.class)
public class ServiceConfig {

@Bean(name = "inputChannel")
public DirectChannel inputChannel() {
    return new DirectChannel();
}

@Bean(name = "outputChannel")
public QueueChannel outputChannel() {
    return new QueueChannel();
}

@Bean(name = "transactionLogger")
public ourLogger ourTransactionLogger() {
    return OurLoggerFactory.getLogger("ourAppTrx", new ourLoggerConfig(ourTransactionLoggerKey.values()));
}

public IntegrationFlow ourFlow() {
     return IntegrationFlows.from(inputChannel())
            .split(splitter(ourTransactionLogger()))
            .channel(MessageChannels.executor(getExecutor()))
            .handle(ourServiceActivator, "service")
            .aggregate(t -> t.processor(ourAggregator, AGGREGATE))
            .channel(outputChannel())
            .get();
}

@Bean(name = "executor")
public Executor getExecutor()
{
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    return executor;
}
}
//snippet from calling service

public InquiryResponse inquire(InquiryRequest request) {

    inputChannel.send(MessageBuilder.withPayload(request).build());
    Message<?> msgResponse = outputChannel.receive();   
    InquiryResponse response = (InquiryResponse) msgResponse.getPayload();
    TransactionLogger.debug("correlationId + msgResponse.getHeaders().get("correlationId"));
    TransactionLogger.debug("InquiryService inquire response = " + response.toString());

    return response;
}

//snippet from aggregator

@Aggregator
public <T> InquiryResponse aggregate(List<Message> serviceResponses) {
    InquiryResponse response = new InquiryResponse();
     serviceResponses.forEach(serviceResponse -> {
            Object payload = serviceResponse.getPayload();

            if (payload instanceof AMatchResponse) {
                response.setA(((AMatchResponse) payload).getA());
            } else if (payload instanceof BValueResponse) {
                response.setB(((BValueResponse) payload).getB());
            } else if (payload instanceof BError) {
                 response.setB(new B().addBErrorsItem((BError) payload));
            } else if (payload instanceof AError) {
                response.setA(new A().AError((AError) payload));
            } else {
                transactionLogger.warn("Unknown message type received. This message will not be aggregated into the response. ||| model=" + payload.getClass().getName());
            }

    });
    transactionLogger.debug("OurAggregator.response = " + response.toString());
    return response;
}   
4

0 回答 0