我正在使用基于 XML 的 spring 集成并用于s3-inbound-streaming-channel-adapter
从单个s3 存储桶流式传输。
我们现在需要从两个s3 存储桶进行流式传输。
那么是否可以s3-inbound-streaming-channel-adapter
从多个存储桶中流式传输?
或者我需要s3-inbound-streaming-channel-adapter
为每个s3 存储桶创建一个单独的存储桶吗?
这是我当前为单个 s3 存储桶设置的,它确实有效。
<int-aws:s3-inbound-streaming-channel-adapter
channel="s3Channel"
session-factory="s3SessionFactory"
filter="acceptOnceFilter"
remote-directory-expression="'bucket-1'">
<int:poller fixed-rate="1000"/>
</int-aws:s3-inbound-streaming-channel-adapter>
提前致谢。
更新:
我最终得到了两个 s3-inbound-streaming-channel-adapter
,如下面的Artem Bilan所述。
但是,对于每个入站适配器,我必须分别声明acceptOnceFilter 和 metadataStore 的实例。
这是因为如果我只有一个 acceptOnceFilter 和 metadataStore 实例,并且它们被两个入站适配器共享,那么就会开始发生一些奇怪的循环。
例如,当file_1.csv 到达bucket-1并得到处理时,如果您将相同的file_1.csv 放在bucket-2上,则开始发生奇怪的循环。不知道为什么!所以我最终为每个入站适配器创建了 acceptOnceFilter 和 metadataStore。
`
<!-- ===================================================== -->
<!-- Region 1 s3-inbound-streaming-channel-adapter setting -->
<!-- ===================================================== -->
<bean id="metadataStore" class="org.springframework.integration.metadata.SimpleMetadataStore"/>
<bean id="acceptOnceFilter"
class="org.springframework.integration.aws.support.filters.S3PersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="metadataStore"/>
<constructor-arg index="1" value="streaming"/>
</bean>
<int-aws:s3-inbound-streaming-channel-adapter id="s3Region1"
channel="s3Channel"
session-factory="s3SessionFactory"
filter="acceptOnceFilter"
remote-directory-expression="'${s3.bucketOne.name}'">
<int:poller fixed-rate="1000"/>
</int-aws:s3-inbound-streaming-channel-adapter>
<int:channel id="s3Channel">
<int:queue capacity="50"/>
</int:channel>
<!-- ===================================================== -->
<!-- Region 2 s3-inbound-streaming-channel-adapter setting -->
<!-- ===================================================== -->
<bean id="metadataStoreRegion2" class="org.springframework.integration.metadata.SimpleMetadataStore"/>
<bean id="acceptOnceFilterRegion2"
class="org.springframework.integration.aws.support.filters.S3PersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="metadataStoreRegion2"/>
<constructor-arg index="1" value="streaming"/>
</bean>
<int-aws:s3-inbound-streaming-channel-adapter id="s3Region2"
channel="s3ChannelRegion2"
session-factory="s3SessionFactoryRegion2"
filter="acceptOnceFilterRegion2"
remote-directory-expression="'${s3.bucketTwo.name}'">
<int:poller fixed-rate="1000"/>
</int-aws:s3-inbound-streaming-channel-adapter>
<int:channel id="s3ChannelRegion2">
<int:queue capacity="50"/>
</int:channel>
`