0

我想使用 VM 端点在 mule 流中实现并行处理。作为 mule 的初学者,我不太确定这样做的含义。我在 mule 3 中阅读了有关私有流的信息,但不确定在这种情况下是否可以用私有流替换 vm 端点,以及是否可以从中获得任何优势。有人可以告诉我使用 VM 的优缺点。这是我想用于并行处理的示例。

<flow name="forkAndJoinFlow">
    <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="81" path="lowestprice" />
    <not-filter>
        <wildcard-filter pattern="*favicon*" />
    </not-filter>
    <request-reply>
        <all>
            <vm:outbound-endpoint path="shop1"/>
            <vm:outbound-endpoint path="shop2"/>
        </all>
        <vm:inbound-endpoint path="response">
            <message-properties-transformer>
                <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2" />
            </message-properties-transformer>
            <collection-aggregator />
        </vm:inbound-endpoint>
    </request-reply>
    <expression-transformer evaluator="groovy" expression="java.util.Collections.min(payload)" />
    <object-to-string-transformer/>
    <logger level="WARN" message="#[string:Lowest price: #[payload]]" />
</flow>

<flow name="shop1Flow">

    <vm:inbound-endpoint path="shop1"/>
    <logger level="INFO" message="SHOP1 Flow..." />
    <expression-transformer evaluator="groovy" expression="new java.lang.Double(1000.0 * Math.random()).intValue()" />
    <logger level="WARN" message="#[string:Price from shop 1: #[payload]]" />
</flow>

<flow name="shop2Flow">

    <vm:inbound-endpoint path="shop2" />
    <logger level="INFO" message="SHOP2 Flow..." />
    <expression-transformer evaluator="groovy" expression="new java.lang.Double(1000.0 * Math.random()).intValue()" />
    <logger level="WARN" message="#[string:Price from shop 2: #[payload]]" />
</flow>

4

2 回答 2

0

vm:endpointsone-wayasynchronous,这意味着您不会收到他们的回复。

Private flowscan be synchronousas well as表示如果是asynchronous,他们也可以返回响应。exchange-patternrequest-response

此外,在private flow你得到所有variables等而不是headers当你将你的消息推送到vm或任何其他JMS时,variables并且inbound properties没有传播并且调用者流变成被调用流outbound propertiesinbound

于 2013-11-07T10:39:38.933 回答
0

Private flows可以用来替代您的VM.

正如所解释的那样private flows ,可以SynchronusAsynchronous基于processingStrategy流的。

我的情况是,您以异步方式使用私有流来实现并行处理,然后确保在每个私有流结束时将响应发布回响应 VM。

我也使用子流实现了并行处理的 fork-join 模式。尝试这个。

    <set-property propertyName="MULE_CORRELATION_GROUP_SIZE" value="1" />
    <all enableCorrelation="IF_NOT_SET">
        <async>
            <set-property propertyName="MULE_CORRELATION_SEQUENCE" value="1" />
            <flow-ref name="parallel-flow1"></flow-ref>
        </async>
        <async>
            <set-property propertyName="MULE_CORRELATION_SEQUENCE"  value="2" />
            <flow-ref name="parallel-flow2"></flow-ref>
        </async>
    </all>

子流程如下。

    <sub-flow name="parallel-flow1">    
        ....
        ....
        <flow-ref name="join-flow" />       
    </sub-flow>

    <sub-flow name="parallel-flow2">    
        ....
        ....
        <flow-ref name="join-flow" />       
    </sub-flow>

    <sub-flow name="join-flow">
        <collection-aggregator  timeout="10000"   failOnTimeout="true"  />          
        <combine-collections-transformer />
        ....
        ....
    </sub-flow>

您可以尝试使用私有流程。

希望这可以帮助。

于 2013-11-07T12:02:19.437 回答