我们正在编写一个批处理作业,它将文件作为来自 FTP 的输入,生成一些新文件并将它们写入 S3 存储桶,为此我们使用 Spring Integration。
FTP 中的文件是从数据库中提取的,每晚都会更新。
问题是,当我们第一次启动应用程序时,它很好地连接到了FTP,下载了文件,并上传了生成结果S3。然后我们在本地删除下载的文件,等待FTP中的文件的下一代重新启动该过程。但它永远不会再次下载文件。
任何想法?
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(ftpReader(),
spec -> spec.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(period)))
.enrichHeaders(Map.of("CORRELATION_ID", "rcm"))
.aggregate(aggregatorSpec -> aggregatorSpec
.correlationStrategy(message -> message.getHeaders().get("CORRELATION_ID"))
.releaseStrategy(group -> group.getMessages().size() == 2))
.transform(stockUnmarshaller)
.transform(stockTransformer)
.transform(stockMarshaller)
.transform(picturesDownloader)
.transform(picturesZipper)
.transform(stockIndexer)
.handle(directoryCleaner)
.nullChannel();
}
@Bean
public FtpInboundChannelAdapterSpec ftpReader() {
return Ftp.inboundAdapter(ftpSessionFactory())
.preserveTimestamp(true)
.remoteDirectory(rootFolder)
.autoCreateLocalDirectory(true)
.localDirectory(new File(localDirectory));
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sessionFactory = new DefaultFtpSessionFactory();
sessionFactory.setHost(host);
sessionFactory.setUsername(userName);
sessionFactory.setPassword(password);
sessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
return sessionFactory;
}
提前致谢。
编辑:
enrichHeaders
如果我们正好有 2 个文件,我会使用它来确保触发管道。也许标题没有被删除并且条件总是大于2?也许这是错误的进行方式?
再次感谢。