1

我有一个 Mule 3.3.0 流程,可以将文件拆分为记录。我需要在所有记录完成处理后执行一个操作(存储过程)。问题是有时在 Mule 处理完所有记录之前执行操作。我认为这是因为 Mule 并行处理东西,这很棒,所以有时最后的动作被调用得太早了。如果我将流程设置为同步,事情似乎可以工作,但我没有利用并行执行。我想我也可以使用 Foreach 范围(没有尝试过),但我想这些东西仍然不会被并行化。有没有办法“等待”直到所有记录完成处理?

我附上了一个非常简单的流程来展示这种行为。如果您运行它,您将看到记录器不会按顺序打印内容。实际上,“DONE”消息在其余消息之前被记录。该流程处理一个简单的 csv 文件,直到它匹配一个值为“end”的字段。当找到此类字段时,有一个选择组件会记录“完成”。其余字段只需记录即可。

任何帮助将不胜感激。

流动:

在此处输入图像描述

流xml

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:file="http://www.mulesoft.org/schema/mule/file"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans" version="CE-3.3.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd 
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd 
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd 
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd ">

<file:connector name="inputFileConnector" autoDelete="true"
    streaming="false" validateConnections="true" doc:name="File" fileAge="60000"
    readFromDirectory="#{systemProperties['user.home']}" />

<flow name="flow1" doc:name="flow1" processingStrategy="synchronous">
    <file:inbound-endpoint path="#{systemProperties['user.home']}"
        responseTimeout="10000" doc:name="Input File" fileAge="100"
        connector-ref="inputFileConnector">
        <file:filename-regex-filter pattern="input.csv"
            caseSensitive="false" />
    </file:inbound-endpoint>
    <byte-array-to-string-transformer
        doc:name="Byte Array to String" />
    <scripting:component doc:name="Groovy">
        <scripting:script engine="Groovy">
            <scripting:text><![CDATA[return payload.split('\n');]]></scripting:text>
        </scripting:script>
    </scripting:component>
    <collection-splitter doc:name="Collection Splitter" />
    <choice doc:name="Choice">
        <when expression="#[groovy:payload != 'end']">
            <processor-chain>
                <logger message="." level="INFO" doc:name="Process"/>
                <vm:outbound-endpoint path="toFlow2" doc:name="VM"/>
            </processor-chain>
        </when>
        <otherwise>
            <processor-chain>
                <logger message="|||| DONE" level="INFO" doc:name="DONE"/>
            </processor-chain>
        </otherwise>
    </choice>
</flow>

<flow name="flow2" doc:name="flow2" >
    <vm:inbound-endpoint path="toFlow2" doc:name="VM"/>
    <scripting:component doc:name="Groovy">
        <scripting:script engine="Groovy">
            <scripting:text><![CDATA[return payload.split(',');]]></scripting:text>
        </scripting:script>
    </scripting:component>
    <collection-splitter doc:name="Collection Splitter" />
    <logger message="|||||| #[payload]" level="INFO" doc:name="Logger"/>
    <vm:outbound-endpoint path="toFlow3" doc:name="VM"/>
</flow>

4

1 回答 1

1

一种选择是使用 acollection-aggregator作为累加器,阻止最终流操作,直到所有消息都已处理。诀窍是collection-splitters 将设置一个相关组大小,该大小仅适用于文件中的行数或文件中的列数。但是我们要累积,直到所有行的所有列都被处理完。解决方案在于首先计算该值(即预期消息的总数)并用总值覆盖已计算的任何相关组大小collection-splitter

以下是我的做法(您会注意到,我用更多的 Mule-3 式 MEL 表达式替换了所有 Groovy 代码段):

<file:connector name="inputFileConnector" autoDelete="true"
    streaming="false" validateConnections="true" fileAge="60000"
    readFromDirectory="#{systemProperties['user.home']}" />

<flow name="flow1" processingStrategy="synchronous">
    <file:inbound-endpoint path="#{systemProperties['user.home']}"
        responseTimeout="10000" fileAge="100"
        connector-ref="inputFileConnector">
        <file:filename-regex-filter pattern="input.csv"
            caseSensitive="false" />
    </file:inbound-endpoint>
    <byte-array-to-string-transformer />
    <set-session-variable variableName="expectedMessageCount"
                          value="#[org.mule.util.StringUtils.countMatches(message.payload, '\n') + org.mule.util.StringUtils.countMatches(message.payload, ',') - 1]" />
    <expression-transformer expression="#[message.payload.split('\n')]" />
    <collection-splitter enableCorrelation="IF_NOT_SET" />
    <set-property propertyName="MULE_CORRELATION_GROUP_SIZE"
                  value="#[sessionVars.expectedMessageCount]" />
    <choice>
        <when expression="#[message.payload != 'end']">
            <processor-chain>
                <logger message="." level="INFO" />
                <vm:outbound-endpoint path="toFlow2" />
            </processor-chain>
        </when>
        <otherwise>
            <processor-chain>
                <logger message="|||| END" level="INFO" />
            </processor-chain>
        </otherwise>
    </choice>
</flow>

<flow name="flow2">
    <vm:inbound-endpoint path="toFlow2"/>
    <expression-transformer expression="#[message.payload.split(',')]" />
    <collection-splitter />
    <set-property propertyName="MULE_CORRELATION_GROUP_SIZE"
                  value="#[sessionVars.expectedMessageCount]" />
    <logger message="|||||| #[message.payload]" level="INFO"/>
    <vm:outbound-endpoint path="toFinalizer" />
    <vm:outbound-endpoint path="toFlow3" />
</flow>

<flow name="finalizer">
    <vm:inbound-endpoint path="toFinalizer" />
    <collection-aggregator />
    <logger message="|||| DONE" level="INFO" />
</flow>

注意。或者,如果使用 acollection-aggregator是一个问题,因为它使用了太多内存,您可以使用表达式组件来递减sessionVars.expectedMessageCount和过滤,以在计数器返回 0 时让消息到达最终消息处理器。

于 2012-10-08T18:38:20.607 回答