1

使用带有集合拆分器/聚合器的请求/回复路由器,我成功地将一组消息异步拆分到其工作人员,然后使用聚合器将结果很好地合并在一起。

现在我想在上面做一个循环(同步),所以我在现有的上面使用 Foreach MP 或另一组拆分聚合(是的,我确实将这些属性保存为调用属性区域并恢复它们背部)。

我可以看到它在第一次迭代中通过聚合器完成,但是请求/回复路由器中的 VM 入站端点永远不会得到任何东西,所以卡住了。我尝试了很多东西,但没有任何帮助。知道为什么吗?

我在 ArrayList AL 下有两个字符串数组 sa: {11, 12, 13} 和 sb: {21, 22, 23}。我想在 AL 上同步循环,对于每个字符串数组,我想异步进行拆分聚合。

很感谢任何形式的帮助。

苏里

大卫,谢谢。
我将记录器放在请求/回复路由器之后,流程没有命中它。我在集合聚合器之后也有一个记录器,它确实命中了它。

这是 XML 配置 ------------

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:core="http://www.mulesoft.org/schema/mule/core" version="EE-3.4.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd 
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd 
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd 
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd 
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd ">
    <queued-asynchronous-processing-strategy name="all2thread" maxThreads="2" doc:name="Queued Asynchronous Processing Strategy"/>
    <flow name="splitertest2Flow1" doc:name="splitertest2Flow1">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8581" doc:name="HTTP"/>
        <expression-filter expression="#[groovy:!payload.contains('.ico')]" doc:name="Expression"/>
        <scripting:transformer doc:name="Groovy">
            <scripting:script engine="Groovy">
                <scripting:text><![CDATA[String [] sa = new String[3];
sa[0]="message ... 11";
sa[1]="message ... 12";
sa[2]="message ... 13";

String [] sb = new String[3];
sb[0]="message ... 21";
sb[1]="message ... 22";
sb[2]="message ... 23";
ArrayList al = new ArrayList();
al.add(sa);
al.add(sb);
message.setPayload(al);
return message;]]></scripting:text>
            </scripting:script>
        </scripting:transformer>
        <foreach doc:name="Foreach">
        <request-reply storePrefix="workStore">
            <vm:outbound-endpoint path="work.IN">  
                <message-properties-transformer scope="outbound"> 
                    <delete-message-property key="MULE_REPLYTO"/> 
                </message-properties-transformer> 
            </vm:outbound-endpoint>
            <vm:inbound-endpoint path="work.OUT"></vm:inbound-endpoint>
         </request-reply>
            <logger message="******** Almost there....." level="INFO" doc:name="Logger"/>
        </foreach>
        <logger message="**************  Very Happy to get here **********************" level="INFO" doc:name="Logger"/>
    </flow>
    <flow name="splitertest2Flow2" doc:name="splitertest2Flow2">
        <vm:inbound-endpoint exchange-pattern="one-way" path="work.IN" doc:name="VM"/>
        <collection-splitter doc:name="Collection Splitter"/>
        <flow-ref name="DoWork2" doc:name="DoWork2"/>
    </flow>
    <flow name="DoWork2" doc:name="DoWork2" processingStrategy="all2thread">
        <scripting:transformer doc:name="Groovy">
            <scripting:script engine="Groovy">
                <scripting:text><![CDATA[String msg = message.getPayload();
println "processing..."+msg;
Thread.sleep(1500);
println "exit..."+msg;
return message;]]></scripting:text>
            </scripting:script>
        </scripting:transformer>
        <vm:outbound-endpoint exchange-pattern="one-way" path="work.Q" doc:name="VM"/>
    </flow>
    <flow name="splitertest2Flow3" doc:name="splitertest2Flow3" processingStrategy="all2thread">
        <vm:inbound-endpoint exchange-pattern="one-way" path="work.Q" doc:name="VM"/>
        <collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
        <logger message="************ after aggregator  ************" level="INFO" doc:name="Logger"/>
        <vm:outbound-endpoint exchange-pattern="one-way" path="work.OUT" doc:name="VM"/>
    </flow>
</mule>
4

1 回答 1

2

有两个问题阻止了它的工作:

  • foreach元素对它创建的所有消息使用相同的相关 ID,这完全搞乱了collection-aggregator下游。聚合器通过在此 ID 上进行分组来工作,并且由于六个消息的相同,因此它无法工作。为了解决这个问题,我必须分配一个新的关联 ID 作为foreach.
  • 计算一个“异步回复相关 ID”,在分派到回复队列 ( )request-reply时必须使用该 ID 。work.OUT通常这个“异步回复相关 ID”等于消息相关 ID,但在这种情况下不是(我怀疑是因为我们在 a 后面foreach)。为了解决这个问题,我必须将其存储asyncReplyCorrelationId在会话变量中,并在调度到回复队列之前将其重新建立为相关 ID。

这是完整的工作配置:

<queued-asynchronous-processing-strategy
    name="all2thread" maxThreads="2" />

<flow name="splitertest2Flow1">
    <http:inbound-endpoint exchange-pattern="request-response"
        host="localhost" port="8581" />
    <expression-filter expression="#[groovy:!payload.contains('.ico')]" />
    <scripting:transformer>
        <scripting:script engine="Groovy">
            <scripting:text><![CDATA[
                String [] sa = new String[3];
                sa[0]="message ... 11";
                sa[1]="message ... 12";
                sa[2]="message ... 13";

                String [] sb = new String[3];
                sb[0]="message ... 21";
                sb[1]="message ... 22";
                sb[2]="message ... 23";
                ArrayList al = new ArrayList();
                al.add(sa);
                al.add(sb);
                message.setPayload(al);
                return message;
         ]]></scripting:text>
        </scripting:script>
    </scripting:transformer>
    <foreach>
        <scripting:transformer>
            <scripting:script engine="Groovy">
                <scripting:text><![CDATA[
                message.correlationId = UUID.randomUUID().toString()
                return message
             ]]></scripting:text>
            </scripting:script>
        </scripting:transformer>
        <request-reply storePrefix="workStore">
            <vm:outbound-endpoint path="work.IN">
                <message-properties-transformer
                    scope="outbound">
                    <delete-message-property key="MULE_REPLYTO" />
                </message-properties-transformer>
                <message-properties-transformer
                    scope="session">
                    <add-message-property key="asyncReplyCorrelationId"
                        value="#[message.correlationId + message.correlationSequence]" />
                </message-properties-transformer>
            </vm:outbound-endpoint>
            <vm:inbound-endpoint path="work.OUT" />
        </request-reply>
        <logger message="******** Almost there....." level="INFO" />
    </foreach>
    <logger message="**************  Very Happy to get here **********************"
        level="INFO" />
</flow>

<flow name="splitertest2Flow2">
    <vm:inbound-endpoint exchange-pattern="one-way"
        path="work.IN" />
    <collection-splitter />
    <flow-ref name="DoWork2" />
</flow>

<flow name="DoWork2">
    <scripting:transformer>
        <scripting:script engine="Groovy">
            <scripting:text><![CDATA[
                String msg = message.getPayload();
                println "processing..."+msg;
                Thread.sleep(1500);
                println "exit..."+msg;
                return message;
         ]]></scripting:text>
        </scripting:script>
    </scripting:transformer>
    <vm:outbound-endpoint exchange-pattern="one-way"
        path="work.Q" />
</flow>

<flow name="splitertest2Flow3" processingStrategy="all2thread">
    <vm:inbound-endpoint exchange-pattern="one-way"
        path="work.Q" />
    <collection-aggregator failOnTimeout="true" />
    <logger message="************ after aggregator  ************"
        level="INFO" />
    <scripting:transformer>
        <scripting:script engine="Groovy">
            <scripting:text><![CDATA[
                message.correlationId = asyncReplyCorrelationId
                return message
         ]]></scripting:text>
        </scripting:script>
    </scripting:transformer>
    <vm:outbound-endpoint exchange-pattern="one-way"
        path="work.OUT" />
</flow>
于 2013-05-01T17:41:05.680 回答