0

在我们的项目中,我们需要从远程 ftp 服务器检索价格。在办公时间内,这工作正常,价格被检索并成功处理。办公时间之后,FTP 服务器上没有发布新的价格,所以正如预期的那样,我们没有发现任何新的价格。

我们的问题是,在几个小时没有找到新价格之后,轮询器停止了轮询。日志文件中没有错误(即使org.springframework.integration在调试级别上运行)也没有异常。我们现在使用单独TaskExecutor的来隔离问题,但轮询器仍然停止。与此同时,我们调整了 cron 表达式以匹配这些时间,以限制资源使用,但轮询器仍然在它应该运行时停止。

非常感谢解决此问题的任何帮助!

我们使用@InboudChannelAdapteron a FtpStreamingMessageSource,其配置如下:

@Bean
    @InboundChannelAdapter(
        value = FTP_PRICES_INBOUND,
        poller = [Poller(
            maxMessagesPerPoll = "\${ftp.fetch.size}",
            cron = "\${ftp.poll.cron}",
            taskExecutor = "ftpTaskExecutor"
        )],
        autoStartup = "\${ftp.fetch.enabled:false}"
    )
    fun ftpInboundFlow(
        @Value("\${ftp.remote.prices.dir}") pricesDir: String,
        @Value("\${ftp.remote.prices.file.pattern}") remoteFilePattern: String,
        @Value("\${ftp.fetch.size}") fetchSize: Int,
        @Value("\${ftp.fetch.enabled:false}") fetchEnabled: Boolean,
        clock: Clock,
        remoteFileTemplate: RemoteFileTemplate<FTPFile>,
        priceParseService: PriceParseService,
        ftpFilterOnlyFilesFromMaxDurationAgo: FtpFilterOnlyFilesFromMaxDurationAgo
    ): FtpStreamingMessageSource {
        val messageSource = FtpStreamingMessageSource(remoteFileTemplate, null)

        messageSource.setRemoteDirectory(pricesDir)
        messageSource.maxFetchSize = fetchSize
        messageSource.setFilter(
            inboundFilters(
                remoteFilePattern,
                ftpFilterOnlyFilesFromMaxDurationAgo
            )
        )
        return messageSource;
    }

属性值为:

poll.cron: "*/30 * 4-20 * * MON-FRI"
fetch.size: 10
fetch.enabled: true

我们限制每分钟使用检索的 poll.cron。

在相关DefaultFtpSessionFactory的 中,超时设置为 60 秒以覆盖默认值 -1(这意味着根本没有超时):

sessionFactory.setDataTimeout(timeOut)
sessionFactory.setConnectTimeout(timeOut)
sessionFactory.setDefaultTimeout(timeOut)
4

2 回答 2

0

也许我的回答似乎有点太简单了,是不是因为你的 cron 表达式表明它应该在 4 到 20 小时之间安排工作。晚上 8:00 之后,它将不再安排作业,并将在凌晨 4:00 再次开始轮询。

于 2020-04-06T17:34:01.217 回答
0

事实证明,处理时间比计划的时间间隔长,因此在处理过程中已经执行了一个新任务。所以最终多个任务都试图完成同样的事情。

我们通过在轮询器上使用 fixedDelay 而不是 fixedRate 解决了这个问题。不同之处在于,如果任务完成,fixedRate 会按固定间隔进行调度,而 fixedDelay 会在任务完成后安排延迟。

于 2020-04-22T09:04:21.047 回答