1

我对 Mule 相当陌生,使用 3.3.0,但我正在尝试我认为应该是一个相当普通的示例。我有一个 mule 配置,它将读取一个 csv 文件并尝试异步处理不同流中的行和列。但是,当消息被“移交”到其中一个异步流时,我们会看到 ConcurrentModificationException。我想知道是否有其他人看到过这个问题,以及他们可能采取了哪些措施来解决这个问题。

java.util.ConcurrentModificationException at org.apache.commons.collections.map.AbstractHashedMap$HashIterator.nextEntry(AbstractHashedMap.java:1113) at org.apache.commons.collections.map.AbstractHashedMap$KeySetIterator.next(AbstractHashedMap.java:938) ) 在 org.mule.DefaultMuleEvent.setMessage(DefaultMuleEvent.java:933) 在 org.mule.DefaultMuleEvent.(DefaultMuleEvent.java:318) 在 org.mule.DefaultMuleEvent.(DefaultMuleEvent.java:290) 在 org.mule.DefaultMuleEvent .copy(DefaultMuleEvent.java:948)

 <queued-asynchronous-processing-strategy poolExhaustedAction="RUN" name="commonProcessingStrategy" maxQueueSize="1000" doc:name="Queued Asynchronous Processing Strategy"/>

<file:connector name="inboundFileConnector" fileAge="1000" autoDelete="true" pollingFrequency="1000" workDirectory="C:/mule/orca/dataprovider/work"/>

<file:endpoint name="dataProviderInbound" path="C:\mule\orca\dataprovider\inbound" moveToPattern="#[function:datestamp]-#[header:originalFilename]" moveToDirectory="C:\mule\orca\dataprovider\history" connector-ref="inboundFileConnector" doc:name="Data Feed File" doc:description="new files are processed in 'work' folder, then moved to 'archive' folder"/>

<flow name="dataProviderFeedFlow">
    <inbound-endpoint ref="dataProviderInbound"/>
    <file:file-to-string-transformer />
    <flow-ref name="dataSub"/>
</flow>

<sub-flow name="dataSub" >

    <splitter expression="#[rows=org.mule.util.StringUtils.split(message.payload, '\n\r')]" />
    <expression-transformer expression="#[org.mule.util.StringUtils.split(message.payload, ',')]" />
    <foreach>

        <flow-ref name="storageFlow" />
        <flow-ref name="id" />            

     </foreach>

</sub-flow>

<flow name="storageFlow" processingStrategy="commonProcessingStrategy">
    <logger level="INFO" message="calling the 'storageFlow' sub flow."/>
</flow>

<flow name="id" processingStrategy="commonProcessingStrategy">
    <logger level="INFO" message="calling the 'id' sub flow."/>
</flow>
4

1 回答 1

4

dataSub这是工作正常的子流程的固定版本:

<sub-flow name="dataSub">
    <splitter expression="#[org.mule.util.StringUtils.split(message.payload, '\n\r')]" />
    <splitter expression="#[org.mule.util.StringUtils.split(message.payload, ',')]" />
    <flow-ref name="storageFlow" />
    <all>
        <async>
            <flow-ref name="storageFlow" />
        </async>
        <async>
            <flow-ref name="id" />
        </async>
    </all>
</sub-flow>

请注意:

  • 我使用两个拆分器表达式,
  • 我使用all消息处理器来确保将相同的有效负载发送到两个私有流,
  • 我必须用async消息处理器包装流引用,否则调用会失败,因为私有流是异步的但all强制同步。
于 2013-03-05T19:44:49.897 回答