我需要在 Mule ESB CE 3.2.1 上聚合内存中的许多 csv 入站文件,如有必要对它们进行重新排序。
我怎么能实现这种逻辑?
我尝试使用 message-chunking-aggregator-router,但它在启动时失败,因为 xsd 架构不接受这样的配置:
<message-chunking-aggregator-router timeout="20000" failOnTimeout="false" >
<expression-message-info-mapping correlationIdExpression="#[header:correlation]"/>
</message-chunking-aggregator-router>
我还尝试将我的相关 ID 附加到入站消息,然后通过自定义聚合器处理它们,但我发现 Mule 内部使用由以下各项组成的密钥:
Serializable key=event.getId()+event.getMessage().getCorrelationSequence();//EventGroup:264
内部 id 每次都不同(如果相关序列是正确的):这样,Mule 不会像我预期的那样只使用相关序列,并且多次处理相同的消息。
最后,我可以重新编写自定义聚合器,但我想使用更统一的技术。
提前致谢,
加布里埃尔
更新
我试过使用 message-chunk-aggregator 但它不符合我的要求,因为它承认重复。
我尝试详细说明我需要涵盖的场景:
- Mule 投票(在 SFTP 位置上)
- 检测到文件 1“FIXEDPREFIX_1_of_2.zip”并将其保存在内存中的某处(作为打开的 SFTPStream,没关系)。为分组保留一些相关信息:组、序列、组大小。
- 再次检测到文件 1“FIXEDPREFIX_1_of_2.zip”,但无法插入,因为会重复
- 检测到文件 2“FIXEDPREFIX_2_of_2.zip”,并正确添加
- 表示已达到组大小,Mule 使用正确的消息集路由 MessageCollection
关于第 2 点,我很幸运能够从文件名中获取信息并将它们放入 MuleMessage::correlation* 属性中,以便后续组件可以使用它们。
我做到了,但重复的处理方式相同。
再次感谢
加布里埃尔