0

我们正在编写一个批处理作业,它将文件作为来自 FTP 的输入,生成一些新文件并将它们写入 S3 存储桶,为此我们使用 Spring Integration。

FTP 中的文件是从数据库中提取的,每晚都会更新。

问题是,当我们第一次启动应用程序时,它很好地连接到了FTP,下载了文件,并上传了生成结果S3。然后我们在本地删除下载的文件,等待FTP中的文件的下一代重新启动该过程。但它永远不会再次下载文件。

任何想法?

    @Bean
    public IntegrationFlow ftpInboundFlow() {
        return IntegrationFlows
                .from(ftpReader(),
                        spec -> spec.id("ftpInboundAdapter")
                                .autoStartup(true)
                                .poller(Pollers.fixedDelay(period)))
                .enrichHeaders(Map.of("CORRELATION_ID", "rcm"))
                .aggregate(aggregatorSpec -> aggregatorSpec
                        .correlationStrategy(message -> message.getHeaders().get("CORRELATION_ID"))
                        .releaseStrategy(group -> group.getMessages().size() == 2))
                .transform(stockUnmarshaller)
                .transform(stockTransformer)
                .transform(stockMarshaller)
                .transform(picturesDownloader)
                .transform(picturesZipper)
                .transform(stockIndexer)
                .handle(directoryCleaner)
                .nullChannel();
    }

    @Bean
    public FtpInboundChannelAdapterSpec ftpReader() {
        return Ftp.inboundAdapter(ftpSessionFactory())
                .preserveTimestamp(true)
                .remoteDirectory(rootFolder)
                .autoCreateLocalDirectory(true)
                .localDirectory(new File(localDirectory));
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sessionFactory = new DefaultFtpSessionFactory();
        sessionFactory.setHost(host);
        sessionFactory.setUsername(userName);
        sessionFactory.setPassword(password);
        sessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
        return sessionFactory;
    }

提前致谢。

编辑:

enrichHeaders如果我们正好有 2 个文件,我会使用它来确保触发管道。也许标题没有被删除并且条件总是大于2?也许这是错误的进行方式?

再次感谢。

4

2 回答 2

2

听起来你在谈论同一个文件。在这种情况下,从本地目录中删除它是不够的。该过程中涉及一些FileListFilter实例,其中包含已处理文件的条目。并且根据您的配置,您可以处理内存中的变体。他们真的对您的本地文件删除一无所知。

确切地说,您需要担心两个过滤器:FtpPersistentAcceptOnceFileListFilter远程条目和FileSystemPersistentAcceptOnceFileListFilter文件的本地副本。它们都在实现,因此,您可以在完成文件处理时ResettableFileListFilter调用它们。remove()

FtpInboundChannelAdapterSpecJava DSL 有以下选项:

/**
 * Configure a {@link FileListFilter} to be applied to the remote files before
 * copying them.
 * @param filter the filter.
 * @return the spec.
 */
public S filter(FileListFilter<F> filter) {

/**
 * A {@link FileListFilter} used to determine which files will generate messages
 * after they have been synchronized.
 * @param localFileListFilter the localFileListFilter.
 * @return the spec.
 * @see AbstractInboundFileSynchronizingMessageSource#setLocalFilter(FileListFilter)
 */
public S localFilter(FileListFilter<File> localFileListFilter) {

因此,您仍然可以将那些提到的过滤器作为默认过滤器,但是您将它们提取为 bean 并注入这些选项并注入到您directoryCleaner的过滤器中以执行从这些过滤器中删除的操作。

还有一个选项,例如:

/**
 * Switch the local {@link FileReadingMessageSource} to use its internal
 * {@code FileReadingMessageSource.WatchServiceDirectoryScanner}.
 * @param useWatchService the {@code boolean} flag to switch to
 * {@code FileReadingMessageSource.WatchServiceDirectoryScanner} on {@code true}.
 * @since 5.0
 */
public void setUseWatchService(boolean useWatchService) {

并且DELETE还为观察者配置了事件。发生这种情况时,已删除的文件也会从本地过滤器中删除。

您还可以在配置时正确处理远程文件:

/**
 * Set to true to enable the preservation of the remote file timestamp when transferring.
 * @param preserveTimestamp true to preserve.
 * @return the spec.
 */
public S preserveTimestamp(boolean preserveTimestamp) {

这样,具有相同名称的较新文件将被视为不同的文件,并且其在上述过滤器中的条目将被覆盖。虽然我看到你已经在使用它了,但是你仍然抱怨它不起作用。当FileSystemPersistentAcceptOnceFileListFilter不用于本地文件时,某些旧版本的 Spring Integration 可能会出现这种情况。

于 2019-10-28T13:36:23.237 回答
1

入站通道适配器有两个过滤器.filter.localFilter.

第一个在下载前过滤远程文件,第二个过滤文件系统上的文件。

默认情况下,filterFtpPersistentAcceptOnceFileListFilter只会获取新的或更改的文件。

默认情况下,localFilter如果FileSystemPersistentAcceptOnceFileListFilter时间戳已更改,它只会再次传递文件。

因此,只有在其时间戳发生更改时,才会重新处理该文件。

我建议你在调试器中运行,看看为什么它没有通过过滤器。

于 2019-10-28T13:34:54.360 回答