0

我有一个场景,我需要并行调用 A 和 B 系统的 REST 调用并聚合响应并转换为单个FinalResponse.

为此,我使用了 Spring Integration 拆分器和聚合器,配置如下所示。

我已经公开了一个 REST 端点,当请求(请求在标头中有 co-relationId)到达控制器时,我们调用网关并且拆分器将请求发送到 A 和 B 通道。服务激活器 A 监听通道 A 并调用 A 系统的 REST 调用,服务激活器 B 监听 B 通道并调用 B 系统的 REST 调用。然后我需要汇总来自 A 和 B 系统的响应,然后将其转换为FinalResponse. 目前聚合和转换工作正常。

有时当多个请求到达控制器FinalResponse时,与对控制器的单个请求相比,需要更多时间。对请求的所有响应几乎同时出现,不知道为什么(即使对控制器的最后一个请求是在第一个请求后 6-7 秒发送的)。我的与线程相关的配置有问题吗?不知道为什么当多个请求到达控制器时需要更多时间来响应。另外,我没有使用任何 CorrelationStrategy,我们需要使用它吗?使用以下配置,我会在多线程环境中遇到任何问题吗? 有关配置的任何反馈都会有所帮助

// Controller

     {
     FinalResponse aggregatedResponse = gateway.collateServiceInformation(inputData);

     }

     //Configuration 

     @Autowired
     Transformer transformer;

     //Gateway
     @Bean
        public IntegrationFlow gatewayServiceFlow() {
            return IntegrationFlows.from("input_channel")
                    .channel("split_channel").get();
        }

//splitter
     @Bean
     public IntegrationFlow splitAggregatorFlow() {
            return IntegrationFlows.from("split_channel").
                    .split(SomeClass.class, SomeClass::getResources)
                    .channel(c -> c.executor(Executors.newCachedThreadPool()))
                    .<Resource, String>route(Resource::getName,
                            mapping -> mapping.channelMapping("A", "A")
                                    .channelMapping("B", "B"))
                    .get();

        }

//aggregator
        @Bean
        public IntegrationFlow aggregateFlow() {
            return IntegrationFlows.from("aggregate_channel").aggregate()
                    .channel("transform_channel").transform(transformer).get();
        }
        .
        .
        .
    //Transformer   
    @Component
    @Scope("prototype")
    public class Transformer {

     @Transformer
        public FinalResponse transform(final List<Result> responsesFromAAndB) {
        //transformation logic and then return final response
       }
    }
4

1 回答 1

0

拆分器为标头中的相关详细信息提供默认策略。聚合器将在之后使用它们。你所说的被称为scatter-gatherhttps ://docs.spring.io/spring-integration/docs/5.0.8.RELEASE/reference/html/messaging-routing-chapter.html#scatter-gather 。有一个 Java DSL 等价物。

我认为您的问题是拆分集中的某些请求失败,所以聚合器无法完成该请求的组。到目前为止,您的配置中没有什么明显的...

于 2018-09-28T20:02:43.813 回答