3

要求是开发并行调用 3 个不同同步服务的 mule 流,然后汇总每个同步服务的响应并将其发送回调用者。

我遵循了文档和How to make parallel outbound calls中提到的 fork join 方法。我的配置文件如下所示:

            <flow name="fork">
            <http:inbound-endpoint host="localhost" port="8090" path="mainPath" exchange-pattern="request-response">
            <set-property propertyName="MULE_CORRELATION_GROUP_SIZE"
                value="2" />
            <all enableCorrelation="IF_NOT_SET">
                <async>
                    <set-property propertyName="MULE_CORRELATION_SEQUENCE"
                        value="1" />
                    <flow-ref name="parallel1" />
                </async>
                <async>
                    <set-property propertyName="MULE_CORRELATION_SEQUENCE"
                        value="2" />
                    <flow-ref name="parallel2" />
                </async>
            </all>
        </flow>

        <sub-flow name="parallel1">
            <logger level="INFO" message="parallel1: processing started" />
            <!- Transformation payload -->
            <http:outbound-endpoint address="..."
                exchange-pattern="request-response" />
            <logger level="INFO" message="parallel1: processing finished" />
            <flow-ref name="join" />
        </sub-flow>

        <sub-flow name="parallel2">
            <logger level="INFO" message="parallel2: processing started" />
            <!- Transformation payload -->
            <http:outbound-endpoint address="..."
                exchange-pattern="request-response" />
            <logger level="INFO" message="parallel2: processing finished" />
            <flow-ref name="join" />
        </sub-flow>

        <sub-flow name="join">
            <collection-aggregator timeout="6000"
                failOnTimeout="true" />
            <combine-collections-transformer />
            <logger level="INFO"  message="Continuing processing of: #[message.payloadAs(java.lang.String)]" />
            <set-payload value="Soap XML Response"/>
        </sub-flow>

我能够验证直到“加入”子流工作正常,但响应没有作为“Soap XML 响应”返回。响应是相同的初始 SOAP 请求。

我怎样才能让这个线程等到子流处理完成,然后无论“加入”子流返回什么,它都会发回响应?

4

1 回答 1

5

上面帖子中的分叉连接看起来不错。这里的问题是无法在加入后捕获有效负载并将其带回主流。

由于对并行的调用进行了异步,因此主要流程继续进行,而无需等待连接输出。

我已经修改了流程来解决这个问题。现在流程将有一个处理器等待回复并读取要写入 http 转换器的连接输出。

    <flow name="fork">
        <http:inbound-endpoint host="localhost" port="8090" path="mainPath" exchange-pattern="request-response">
            <!-- To get back the response after the fork-join -->
        <request-reply timeout="60000">
            <jms:outbound-endpoint queue="parallel.processor.queue">
                <message-properties-transformer scope="outbound">
                    <delete-message-property key="MULE_REPLYTO" />
                </message-properties-transformer>
            </jms:outbound-endpoint>
            <jms:inbound-endpoint queue="join.queue" >      
            </jms:inbound-endpoint>
        </request-reply>            
    </flow>

    <flow name="fork_join_flow" >
        <jms:inbound-endpoint queue="parallel.processor.queue" exchange-pattern="one-way" />        
        <set-property propertyName="MULE_CORRELATION_GROUP_SIZE"
                value="2" />
        <all enableCorrelation="IF_NOT_SET">
            <async>
                <set-property propertyName="MULE_CORRELATION_SEQUENCE"
                    value="1" />
                <flow-ref name="parallel1" />
            </async>
            <async>
                <set-property propertyName="MULE_CORRELATION_SEQUENCE"
                    value="2" />
                <flow-ref name="parallel2" />
            </async>
        </all>
    </flow>

    <sub-flow name="parallel1">
        <logger level="INFO" message="parallel1: processing started" />
        <!- Transformation payload -->
        <http:outbound-endpoint address="..."
            exchange-pattern="request-response" />
        <logger level="INFO" message="parallel1: processing finished" />
        <flow-ref name="join" />
    </sub-flow>

    <sub-flow name="parallel2">
        <logger level="INFO" message="parallel2: processing started" />
        <!- Transformation payload -->
        <http:outbound-endpoint address="..."
            exchange-pattern="request-response" />
        <logger level="INFO" message="parallel2: processing finished" />
        <flow-ref name="join" />
    </sub-flow>

    <sub-flow name="join">
        <collection-aggregator timeout="6000"
            failOnTimeout="true" />
        <combine-collections-transformer />
        <logger level="INFO"  message="Continuing processing of: #[message.payloadAs(java.lang.String)]" />
        <set-payload value="Soap XML Response"/>
        <jms:outbound-endpoint queue="join.queue">              
        </jms:outbound-endpoint>
    </sub-flow>

希望这可以帮助。

于 2013-05-29T17:30:22.953 回答