0

我的方案是在文件被修改时仅将一个文件从 AWS S3 存储桶传输到 EC2 实例一次。我使用以下配置并在服务器启动时手动启动适配器。

问题是服务器启动时重复执行 5 或 6 次。看起来正在执行不同的线程。我能够在日志中看到不同的任务执行器,不知道它是轮询器问题还是适配器问题。

我正在使用服务激活器根据 S3 位置中的文件更改执行一些其他操作。

注意:此问题仅在启动时发生一次。进一步的文件修改工作正常。

配置:

<bean id="s3SessionFactory" 
            class="org.springframework.integration.aws.support.S3SessionFactory"></bean>
        <bean id="acceptOnceFilter"
            class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
        <task:executor id="s3PollingExecutor" pool-size="1" queue-capacity="10" />
        <integration:channel id="s3FilesChannel"/>
        <int-aws:s3-inbound-channel-adapter id="s3FileInbound"
                     channel="s3FilesChannel" 
                     session-factory="s3SessionFactory" 
                     auto-create-local-directory="false"
                     delete-remote-files="false" 
                     preserve-timestamp="true"
                     filter="acceptOnceFilter"
                     local-directory="local_directory"
                     auto-startup="false" 
                     remote-directory="s3_bucket">
            <integration:poller id="s3FilesChannelPoller" 
                                fixed-rate="1000" 
                                max-messages-per-poll="1" time-unit="MILLISECONDS" 
                                task-executor="s3PollingExecutor">
            </integration:poller>
        </int-aws:s3-inbound-channel-adapter>
        <integration:service-activator id="s3FilesChannelWatcher" 
                                       input-channel="s3FilesChannel" 
                                       output-channel="nullChannel"
                                       ref="configurationFileWatcher" 
                                       method="getConfigurationFileWatcher">
        </integration:service-activator>                                      

正如你所建议的,我尝试了以下方法。

<bean id="acceptOnceFilterRegion"
      class="cS3FileFilterOnLastModifiedTime">
    <constructor-arg index="0" ref="metaDataStoreRegion"/>
    <constructor-arg index="1" value="*"/>
</bean>                                                                                                

添加了用于检查上次修改时间的逻辑

import org.springframework.integration.aws.support.filters.S3PersistentAcceptOnceFileListFilter;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import com.amazonaws.services.s3.model.S3ObjectSummary;
public class S3FileFilterOnLastModifiedTime extends S3PersistentAcceptOnceFileListFilter {

    Long delayTime = 1000L;

    public S3FileFilterOnLastModifiedTime(ConcurrentMetadataStore store, String prefix) {
        super(store, prefix);
    }

    @Override
    public boolean accept(S3ObjectSummary file) {
        long lastModified = modified(file);
        long currentTime = System.currentTimeMillis();
        long timeDifference = currentTime - lastModified;
        return timeDifference > delayTime;
    }   
} 

仍然没有希望。日志是这样的.......

[INFO ] 2018-10-11 11:22:10,888 [s3PollingExecutor-1] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
[INFO ] 2018-10-11 11:22:10,892 [s3PollingExecutor-2] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
[INFO ] 2018-10-11 11:22:10,892 [s3PollingExecutor-3] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
[INFO ] 2018-10-11 11:22:10,893 [s3PollingExecutor-4] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
[INFO ] 2018-10-11 11:22:10,893 [s3PollingExecutor-5] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
[INFO ] 2018-10-11 11:22:10,894 [s3PollingExecutor-6] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
4

1 回答 1

0

我认为您错过了这样一个事实,即您s3_bucket在启动时不是空的,并且在启动时会<int-aws:s3-inbound-channel-adapter>拾取所有文件。这就是您如何看待这几个任务:每个任务都针对每个文件。

如果您真的只担心该存储桶中的新文件,则需要考虑不要使用 in-memory AcceptOnceFileListFilter,而是切换到基于共享实现的一些持久MetadataStore实现。为此目的,S3PersistentAcceptOnceFileListFilter在AWS 上有一个过滤结果spring-integration-aws并将其持久化: https ://github.com/spring-projects/spring-integration-aws#metadata-store-for-amazon-dynamodbDynamoDbMetadataStoreDynamoDb

于 2018-10-10T18:54:07.897 回答