1

我需要收集和处理由另一个组织生成的文件集。为简单起见,假设该集合由两个文件组成,一个摘要文件和一个详细文件,名称如下:SUM20150701.dat 和 DTL20150701.dat,这将构成日期为 20150701 的集合。问题是,集合需要按顺序处理,并且从外部组织传输文件可能容易出错,从而可能丢失文件。如果发生这种情况,则应保留该文件集,找到的任何以下文件集也应保留。例如,在 mule 进程开始时,源文件夹中可能包含:SUM20150701.dat、SUM20150703.dat、DTL20150703.dat。也就是说,20150701 的数据集是不完整的,而 20150703 是完整的。我需要保留两个数据集,直到 DTL20150701.dat 到达,然后按顺序处理它们。

在我的 mule 进程的这种简化形式中,会监视源文件夹中的文件。找到后,它们将被移动到存档文件夹并使用日期作为序列和相关值传递给集合聚合器。当一组完成时,它被移动到目标文件夹。在收集器上使用长时间超时以确保不处理不完整的集合:

<file:connector name="File" autoDelete="false" streaming="false" validateConnections="true" doc:name="File">
    <file:expression-filename-parser />
</file:connector>

<file:connector name="File1" autoDelete="false" outputAppend="true" streaming="false" validateConnections="true" doc:name="File" />

<vm:connector name="VM" validateConnections="true" doc:name="VM">
    <receiver-threading-profile maxThreadsActive="1"></receiver-threading-profile>
</vm:connector>

<flow name="fileaggreFlow2" doc:name="fileaggreFlow2">
    <file:inbound-endpoint path="G:\SourceDir" moveToDirectory="g:\SourceDir\Archive" connector-ref="File1" doc:name="get-working-files"                            
             responseTimeout="10000" pollingFrequency="5000" fileAge="600000" >
        <file:filename-regex-filter pattern="DTL(.*).dat|SUM(.*).dat" caseSensitive="false"/>
    </file:inbound-endpoint>

    <message-properties-transformer overwrite="true" doc:name="Message Properties">
        <add-message-property key="MULE_CORRELATION_ID" value="#[message.inboundProperties.originalFilename.substring(5, message.inboundProperties.originalFilename.lastIndexOf('.'))]"/>
        <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2"/>
        <add-message-property key="MULE_CORRELATION_SEQUENCE" value="#[message.inboundProperties.originalFilename.substring(5, message.inboundProperties.originalFilename.lastIndexOf('.'))]"/>       
    </message-properties-transformer>

    <vm:outbound-endpoint exchange-pattern="one-way" path="Merge" doc:name="VM" connector-ref="VM"/>
</flow>


<flow name="fileaggreFlow1" doc:name="fileaggreFlow1" processingStrategy="synchronous">
    <vm:inbound-endpoint exchange-pattern="one-way" path="Merge" doc:name="VM" connector-ref="VM"/>

    <processor-chain doc:name="Processor Chain">
        <collection-aggregator timeout="1000000" failOnTimeout="true" doc:name="Collection Aggregator"/>

        <foreach doc:name="For Each">
            <file:outbound-endpoint path="G:\DestDir1" outputPattern="#[function:datestamp:yyyyMMdd.HHmmss].#[message.inboundProperties.originalFilename]" responseTimeout="10000" connector-ref="File1" doc:name="Destination"/>
        </foreach>
    </processor-chain>

如果所有集合都完整,这将正确处理按顺序找到的集合。它正确地等待不完整的集合被填充,但不保存后续集合,即在上面的示例中,集合 20150703 将处理,而 20150701 仍在等待 DTL 文件。

如果存在未完成的较早集合,是否存在强制集合聚合器元素等待的设置或其他构造?

我将文件名的日期部分用于相关性和序列 ID,如果所有集合都完成,它确实可以按照我想要的顺序控制集合过程。如果日期不存在(如本例中的 20150702)并不重要,只需按顺序处理现有文件并且集合必须完整。

4

1 回答 1

0

最后,我无法让 Collection-Aggregator 做到这一点。为了克服这个问题,我构建了一个 Java 类,其中包含 SUM 和 DTL 文件的映射,以 Correlation ID 作为键,以及打开键的排序列表。

然后,Java 类监视最小键上的完整集合,并在该集合可用于处理时向 Mule 流发送信号。

Mule 流在处理文件时必须进入同步模式,以防止出现数据竞争情况。完成时,它向 Java 类发出处理完成的信号,并且可以从列表/映射中删除数据集,并且如果下一组已准备好处理,则会收到返回的指示。

它不是最漂亮的,我宁愿不为此使用自定义功能,但它可以完成工作。

于 2015-08-06T21:12:50.213 回答