2

我们正在尝试使用 scatter-gather 对不同的收件人进行并行调用,它工作正常。但是除非第一个收件人完成(在 Zipkin 中跟踪),否则第二个收件人流程不会开始。有没有办法让所有接收者异步..非常类似于带有执行器通道的拆分聚合。

public IntegrationFlow flow1() {

        return flow -> flow
                .split().channel(c -> c.executor(Executors.newCachedThreadPool()))
                .scatterGather(
                        scatterer -> scatterer
                                .applySequence(true)
                                .recipientFlow(flow2())
                                .recipientFlow(flow3())
                                .recipientFlow(flow4())
                                .recipientFlow(flow5()),
                        gatherer -> gatherer
                                .outputProcessor(messageGroup -> {
                                    Object request = gatherResponse(messageGroup);
                                    return createResponse(request);
                                }))
                .aggregate();
    }

flow2(),flow3(),flow4() 方法是具有InterationFlow作为返回类型的方法。

示例代码flow2()

public IntegrationFlow flow2() {
        return integrationFlowDefinition -> integrationFlowDefinition
                .enrichHeaders(
                        h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
                .transform(ele -> createRequest1(ele))                  
                .wireTap("asyncXMLLogging")
                .handle(wsGateway.applyAsHandler(endpoint1))
                .transform(
                        ele -> response2(ele));
    }
4

1 回答 1

1

这确实是可能的executor channel。您所有的收件人流程都必须真正从ExecutorChannel. 在您的情况下,您必须将它们全部修改为以下内容:

public IntegrationFlow flow2() {
    return IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
            .enrichHeaders(
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
            .transform(ele -> createRequest1(ele))                  
            .wireTap("asyncXMLLogging")
            .handle(wsGateway.applyAsHandler(endpoint1))
            .transform(
                    ele -> response2(ele))
            .get();
}

注意IntegrationFlows.from(MessageChannels.executor(taskExexecutor())). 这正是您可以使每个子流异步的方式。

更新

IntegrationFlow对于没有改进子流程的旧 Spring Integration 版本,我们可以这样做:

public IntegrationFlow flow2() {
    return integrationFlowDefinition -> integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .enrichHeaders(
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
            .transform(ele -> createRequest1(ele))                  
            .wireTap("asyncXMLLogging")
            .handle(wsGateway.applyAsHandler(endpoint1))
            .transform(
                    ele -> response2(ele));
}

这类似于您在上面的评论中显示的内容。

于 2018-07-25T19:21:47.323 回答