1

我的要求是从我的 spring-batch 应用程序的文件服务器中的远程目录下载多个文件。此处的示例之一 建议使用 SourcePollingChannelAdapter。在此示例中,适配器已启动 adapter.start() 但并未停止。那么它的生命周期是如何工作的呢?我的要求是首先下载&只有当所有文件都下载后,然后继续阅读文件。但这似乎是下载文件的一种异步过程。那么,当所有文件都已下载并准备好继续进行时,我将如何获得通知?我没有看到任何其他方法可以将任何处理程序注入适配器/可轮询通道。

使用的示例配置:

<int-sftp:inbound-channel-adapter id="sftpInbondAdapterBean"
        auto-startup="false" channel="receiveChannelBean" session-factory="sftpSessionFactory"
        local-directory="${sftp.download.localDirResource}"
        remote-directory="${sftp.download.remoteFilePath}"
        auto-create-local-directory="true" delete-remote-files="false"
        filename-regex=".*\.csv$">
        <int:poller fixed-rate="5000" max-messages-per-poll="3"  />
    </int-sftp:inbound-channel-adapter>

    <int:channel id="receiveChannelBean">
        <int:queue/>
    </int:channel>

如果显式调用了adapter.stop(),它会抛出InterruptedException。如果未调用 stop ,则文件以异步方式下载而不会阻塞,因此下一步不知道下载是否已完成或正在进行中。

编辑:DownloadingTasklet 的代码片段

 @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        try {

            sftpInbondAdapter.start();
            Thread.sleep(15000); // ---Is this mandatory ?
        } catch (Exception e) {
            throw new BatchClientException("Batch File Download was un-successful !", e);
        } finally {
           // sftpInbondAdapter.stop();
            Logger.logDebug("BATCH_INPUT_FILE_DOWNLOAD", "COMPLETED");
        }
        return RepeatStatus.FINISHED;
    }
4

1 回答 1

0

工程SftpInboundFileSynchronizingMessageSource分两个阶段:

  • 首先,它会从远程目录同步到本地目录:

    Message<File> message = this.fileSource.receive();
    if (message == null) {
        this.synchronizer.synchronizeToLocalDirectory(this.localDirectory);
        message = this.fileSource.receive();
    }
    
  • 正如你看到的那样,它开始将文件作为消息发出。但已经只有本地文件。因此,从 FTP 下载已经完成。

  • 只有在发送过程中清除所有本地文件时,才会进行下一次下载尝试。

所以,你想要的是一个内置的功能。

编辑

我们测试的一些调试日志:

19:29:32,005 DEBUG task-scheduler-1 util.SimplePool:190 - Obtained new org.springframework.integration.sftp.session.SftpSession@26f16625.
19:29:32,036 DEBUG task-scheduler-1 inbound.SftpInboundFileSynchronizer:287 - cannot copy, not a file: /sftpSource/subSftpSource
19:29:32,036 DEBUG task-scheduler-1 session.CachingSessionFactory:187 - Releasing Session org.springframework.integration.sftp.session.SftpSession@26f16625 back to the pool.
19:29:32,036 DEBUG task-scheduler-1 util.SimplePool:221 - Releasing org.springframework.integration.sftp.session.SftpSession@26f16625 back to the pool
19:29:32,037 DEBUG task-scheduler-1 inbound.SftpInboundFileSynchronizer:270 - 3 files transferred
19:29:32,037 DEBUG task-scheduler-1 file.FileReadingMessageSource:380 - Added to queue: [local-test-dir\rollback\ sftpSource1.txt, local-test-dir\rollback\sftpSource2.txt]
19:29:32,043  INFO task-scheduler-1 file.FileReadingMessageSource:368 - Created message: [GenericMessage [payload=local-test-dir\rollback\ sftpSource1.txt, headers={id=40f75bd1-2150-72a4-76f0-f5c619a246da, timestamp=1465514972043}]]

它的配置如下:

<int-sftp:inbound-channel-adapter
        session-factory="sftpSessionFactory"
        channel="requestChannel"
        remote-directory="/sftpSource"
        local-directory="local-test-dir/rollback"
        auto-create-local-directory="true"
        local-filter="acceptOnceFilter">
    <int:poller fixed-delay="1" max-messages-per-poll="0"/>
</int-sftp:inbound-channel-adapter>

注意它是如何显示3 files transferred的,其中一个是目录,所以跳过了。

怎么说Added to queue:

只有在那之后它才开始将它们作为消息发出。

而一切都只是因为我有fixed-delay="1"。哪怕只有1毫秒。

max-messages-per-poll="0"装置通过一个轮询任务处理所有内容。

于 2016-06-09T22:24:28.577 回答