0

我需要在 Mule ESB CE 3.2.1 上聚合内存中的许多 csv 入站文件,如有必要对它们进行重新排序。

我怎么能实现这种逻辑?

我尝试使用 message-chunking-aggregator-router,但它在启动时失败,因为 xsd 架构不接受这样的配置:

<message-chunking-aggregator-router timeout="20000" failOnTimeout="false" >
            <expression-message-info-mapping correlationIdExpression="#[header:correlation]"/>
</message-chunking-aggregator-router>

我还尝试将我的相关 ID 附加到入站消息,然后通过自定义聚合器处理它们,但我发现 Mule 内部使用由以下各项组成的密钥:

Serializable key=event.getId()+event.getMessage().getCorrelationSequence();//EventGroup:264

内部 id 每次都不同(如果相关序列是正确的):这样,Mule 不会像我预期的那样只使用相关序列,并且多次处理相同的消息。

最后,我可以重新编写自定义聚合器,但我想使用更统一的技术。

提前致谢,

加布里埃尔


更新

我试过使用 message-chunk-aggregator 但它不符合我的要求,因为它承认重复。

我尝试详细说明我需要涵盖的场景:

  1. Mule 投票(在 SFTP 位置上)
  2. 检测到文件 1“FIXEDPREFIX_1_of_2.zip”并将其保存在内存中的某处(作为打开的 SFTPStream,没关系)。为分组保留一些相关信息:组、序列、组大小。
  3. 再次检测到文件 1“FIXEDPREFIX_1_of_2.zip”,但无法插入,因为会重复
  4. 检测到文件 2“FIXEDPREFIX_2_of_2.zip”,并正确添加
  5. 表示已达到组大小,Mule 使用正确的消息集路由 MessageCollection

关于第 2 点,我很幸运能够从文件名中获取信息并将它们放入 MuleMessage::correlation* 属性中,以便后续组件可以使用它们。

我做到了,但重复的处理方式相同。

再次感谢

加布里埃尔

4

1 回答 1

1

这是与 Mule 3 一起使用的正确路由器:http: //www.mulesoft.org/documentation/display/MULE3USER/Routing+Message+Processors#RoutingMessageProcessors-MessageChunkAggregator

于 2012-04-16T16:14:28.810 回答