1

我有一个使用弹簧集成的代码库。

<integration-ftp:inbound-channel-adapter id="ftpInbound"
                               channel="ftpChannel"
                               session-factory="ftpClientFactory"
                               filter="myCustomFilter"                                                                                             
                               auto-create-local-directory="true"
                               delete-remote-files="false"                                                                    
                               remote-directory="/foo/bar"                                                                     
                               local-directory="file:output">
    <integration:poller fixed-rate="5000" max-messages-per-poll="-1"/>
</integration-ftp:inbound-channel-adapter>



<integration:channel id="ftpChannel">
        <integration:queue />
</integration:channel>

<integration:service-activator id="mySA"  method="handleMessage" input-channel="ftpChannel" output-channel="outputChannel" ref="myDataGetter">
    <integration:poller fixed-rate="1000" max-messages-per-poll="-1"/>  
</integration:service-activator>

myCustomFilter 的 bean 工作正常,我在 myDataGetter bean 的 handleMessage() 方法中获取过滤文件。

到目前为止,一切都很好。现在在 myDataGetter bean 中,我正在根据日期进一步过滤文件,例如轮询器会得到 9 个文件,但实际上它们是 3 个版本的 3 个日期。

data_file1.20130816
data_file1.20130815
data_file1.20130814
data_file2.20130816
data_file2.20130815
data_file2.20130814
data_file3.20130816
data_file3.20130815
data_file3.20130814

现在我的目标是获取最新的 3 个文件,即 data_file1,2 和 3 的 20130816 版本。所以我在 handleMessage 方法中建立了一个逻辑来构建包含这 3 个文件的最新版本的 HashMap。它使用迭代数据列表并将主题中的文件与其进行比较的幼稚逻辑完成。经过几次迭代,我得到了包含最新 3 个文件的 HashMap 构建。

现在我的下一个要求是通过通道将这 3 个文件传递给下一个 bean。

但是应该从通道读取数据的 bean 应该只在 HashMap 完全使用最新的 3 个数据文件构建时读取。在 SI 中是否可以做任何事情,以便只有在处理所有传入数据并从中过滤出一组数据后才将数据放入下一个通道?

我可以考虑让 myDataGetter 像观察者一样可观察,并且让下一个频道的 bean 作为观察者。但它不适合 SI 中的工作方式。

任何意见?

4

1 回答 1

1

您可以将 an<aggregator/>与自定义发布策略一起使用。

于 2013-08-16T17:48:33.617 回答