1

我正在使用基于 XML 的 spring 集成并用于s3-inbound-streaming-channel-adapter单个s3 存储桶流式传输。

我们现在需要从两个s3 存储桶进行流式传输。

那么是否可以s3-inbound-streaming-channel-adapter多个存储桶中流式传输?

或者我需要s3-inbound-streaming-channel-adapter每个s3 存储桶创建一个单独的存储桶吗?

这是我当前为单个 s3 存储桶设置的,它确实有效。

<int-aws:s3-inbound-streaming-channel-adapter 
channel="s3Channel"
session-factory="s3SessionFactory" 
filter="acceptOnceFilter"
remote-directory-expression="'bucket-1'">
    <int:poller fixed-rate="1000"/>
</int-aws:s3-inbound-streaming-channel-adapter>

提前致谢。

更新:

我最终得到了两个 s3-inbound-streaming-channel-adapter,如下面的Artem Bilan所述。

但是,对于每个入站适配器,我必须分别声明acceptOnceFilter 和 metadataStore 的实例。

这是因为如果我只有一个 acceptOnceFilter 和 metadataStore 实例,并且它们被两个入站适配器共享,那么就会开始发生一些奇怪的循环。

例如,当file_1.csv 到达bucket-1并得到处理时,如果您将相同的file_1.csv 放在bucket-2上,则开始发生奇怪的循环。不知道为什么!所以我最终为每个入站适配器创建了 acceptOnceFilter 和 metadataStore。

`

    <!-- ===================================================== -->
    <!-- Region 1 s3-inbound-streaming-channel-adapter setting -->
    <!-- ===================================================== -->

    <bean id="metadataStore" class="org.springframework.integration.metadata.SimpleMetadataStore"/>

    <bean id="acceptOnceFilter"
          class="org.springframework.integration.aws.support.filters.S3PersistentAcceptOnceFileListFilter">
        <constructor-arg index="0" ref="metadataStore"/>
        <constructor-arg index="1" value="streaming"/>
    </bean>

    <int-aws:s3-inbound-streaming-channel-adapter id="s3Region1"
                                                  channel="s3Channel"
                                                  session-factory="s3SessionFactory"
                                                  filter="acceptOnceFilter"
                                                  remote-directory-expression="'${s3.bucketOne.name}'">
        <int:poller fixed-rate="1000"/>
    </int-aws:s3-inbound-streaming-channel-adapter>

    <int:channel id="s3Channel">
        <int:queue capacity="50"/>
    </int:channel>

    <!-- ===================================================== -->
    <!-- Region 2 s3-inbound-streaming-channel-adapter setting -->
    <!-- ===================================================== -->

    <bean id="metadataStoreRegion2" class="org.springframework.integration.metadata.SimpleMetadataStore"/>

    <bean id="acceptOnceFilterRegion2"
          class="org.springframework.integration.aws.support.filters.S3PersistentAcceptOnceFileListFilter">
        <constructor-arg index="0" ref="metadataStoreRegion2"/>
        <constructor-arg index="1" value="streaming"/>
    </bean>

    <int-aws:s3-inbound-streaming-channel-adapter id="s3Region2"
                                                  channel="s3ChannelRegion2"
                                                  session-factory="s3SessionFactoryRegion2"
                                                  filter="acceptOnceFilterRegion2"
                                                  remote-directory-expression="'${s3.bucketTwo.name}'">
        <int:poller fixed-rate="1000"/>
    </int-aws:s3-inbound-streaming-channel-adapter>

    <int:channel id="s3ChannelRegion2">
        <int:queue capacity="50"/>
    </int:channel>

`

4

1 回答 1

1

没错,目前的实现只支持单次remote directory定期轮询。我们现在确实正在努力将这样的解决方案正式化为开箱即用的功能。已经报告了对 (S)FTP 支持的类似请求,尤其是在配置期间事先不知道目标目录时。

如果您为目录中的每个配置多个通道适配器不是什么大问题,那就太好了。您始终可以将消息从它们发送到同一通道进行处理。

否则,您可以考虑通过以下方式循环存储桶列表:

  <xsd:attribute name="remote-directory-expression" type="xsd:string">
        <xsd:annotation>
            <xsd:documentation>
                Specify a SpEL expression which will be used to evaluate the directory
                path to where the files will be transferred
                (e.g., "headers.['remote_dir'] + '/myTransfers'" for outbound endpoints)
                There is no root object (message) for inbound endpoints
                (e.g., "@someBean.fetchDirectory");
            </xsd:documentation>
        </xsd:annotation>
    </xsd:attribute>

在一些豆子里。

于 2018-06-04T15:55:29.713 回答