2

我的用例需要使用 smne 数据丰富我的输入并将其发送到出站端点。

通过调用两个 Web 服务获取用于丰富的数据,然后从回复中提取数据。此提取的数据被丰富到我的输入 XML 中并发送到出站端点。

我需要进行的两个 Web 服务调用需要并行,因为它们不依赖于另一个。这样我可以节省我的处理时间。

请建议我如何在 Mule 的流程中实现这种并行处理。

注意:我尝试过使用 ALL 流控制,但似乎是按顺序调用 Web 服务(子流)。

下面给出的是我的抽象流程。

<flow name="mainFlow">
    <inbound-endpoint> .....

    <some validation>

    <setting some flow variables>

    <!-- Now make calls to the sub-flows which has some processing of the input and make some web-service calls -->
    <all>
        <flow-ref name="myFlow1" />
        <flow-ref name="myFlow2" />
        <flow-ref name="myFlow3" />
    </all>

    <enrich the input with the data obtained from the output of the above three flows>

    <outbound-endpoint>
</flow>



<flow name="myFlow1">
    <some transformer to transform the payload provided >

    < the tran sformed payload is passed as input to the web-service call>

    <http:outbound-endpoint ...>

    <transform the reply from the web-service call>
</flow>



<flow name="myFlow2">
    <some transformer to transform the payload provided >

    < the tran sformed payload is passed as input to the web-service call>

    <http:outbound-endpoint ...>

    <transform the reply from the web-service call>
</flow>



<flow name="myFlow3">
    <some transformer to transform the payload provided to it>

    < the tran sformed payload is passed as input to the web-service call>

    <http:outbound-endpoint ...>

    <transform the reply from the web-service call>
</flow>
4

1 回答 1

2

这是一个简单的配置,展示了一种使用两个 HTTP 出站端点进行分叉/加入的方法。要添加第三个端点,请将第三个端点设置MULE_CORRELATION_GROUP_SIZE3和。MULE_CORRELATION_SEQUENCEasync flow-ref3

<flow name="fork">
    <vm:inbound-endpoint path="fork.in" />
    <set-property propertyName="MULE_CORRELATION_GROUP_SIZE"
        value="2" />
    <all enableCorrelation="IF_NOT_SET">
        <async>
            <set-property propertyName="MULE_CORRELATION_SEQUENCE"
                value="1" />
            <flow-ref name="parallel1" />
        </async>
        <async>
            <set-property propertyName="MULE_CORRELATION_SEQUENCE"
                value="2" />
            <flow-ref name="parallel2" />
        </async>
    </all>
</flow>

<sub-flow name="parallel1">
    <logger level="INFO" message="parallel1: processing started" />
    <http:outbound-endpoint address="..."
        exchange-pattern="request-response" />
    <logger level="INFO" message="parallel1: processing finished" />
    <flow-ref name="join" />
</sub-flow>

<sub-flow name="parallel2">
    <logger level="INFO" message="parallel2: processing started" />
    <http:outbound-endpoint address="..."
        exchange-pattern="request-response" />
    <logger level="INFO" message="parallel2: processing finished" />
    <flow-ref name="join" />
</sub-flow>

<sub-flow name="join">
    <collection-aggregator timeout="6000"
        failOnTimeout="true" />
    <combine-collections-transformer />
    <logger level="INFO"
        message="Continuing processing of: #[message.payloadAs(java.lang.String)]" />
</sub-flow>

编辑:在上面的配置中,聚合器在 6 秒后超时。对于您的实际用例来说,这可能太短了:根据需要增加。此外,它设置为超时失败,如果并非所有出站 HTTP 端点交互都成功,这可能不是您想要的行为:由您根据您的用例决定。

于 2012-12-29T00:10:07.453 回答