我正在尝试使 Spring Integration SFTP 从所有子文件夹递归地从远程服务器读取文件(.txt)。远程文件夹类似于“/tmp/remoteFolder”,所有子文件夹都是日期文件夹,例如“/tmp/remoteFolder/20180830”、“/tmp/remoteFolder/20170902”。
这是我到目前为止的代码
@Bean
@InboundChannelAdapter(value = "sftpMgetInputChannel",
poller = @Poller(fixedDelay = "5000"))
public IntegrationFlow sftpMGetFlow() {
return IntegrationFlows.from("sftpMgetInputChannel")
.handleWithAdapter(h -> h.sftpGateway(this.sftpSessionFactory,
Command.MGET, "'/tmp/remoteDirectory/*'")
.options(Option.RECURSIVE)
.regexFileNameFilter("((\\d{8})|*\\.txt)")
.localDirectoryExpression("sftp-inbound" + "/" + "#remoteDirectory"))
.channel(remoteFileOutputChannel())
.get();
}
@Bean
public MessageChannel sftpMgetInboundChannel(){
return new DirectChannel();
}
@Bean
public PollableChannel remoteFileOutputChannel() {
return new QueueChannel();
}
如何将 sftp mget 的根远程目录指定为 /tmp/remoteFolder?为什么这不起作用?为什么我需要指定输出通道?
更新:而不是调用 channel(remoteFileOutputChannel()) 我调用这样的处理程序
@Bean
public MessageHandler messageHandler(){
return new MessageHandler() { ... }
}
代码更新:
@InboundChannelAdapter(value = "sftpMgetInputChannel", poller = @Poller(fixedDelay = "5000")) 公共字符串文件ForMGET(){ 返回“'/tmp/input/remoteDirectory/*'”; } @豆 公共集成流 sftpMGetFlow() { return IntegrationFlows.from("sftpMgetInputChannel") .handleWithAdapter(h -> h.sftpGateway(this.sftpSessionFactory, Command.MGET,“有效载荷”) .options(Option.RECURSIVE) .regexFileNameFilter("((\\d{8})|*\\.txt)") .localDirectoryExpression("'sftp-inbound/'" + "#remoteDirectory")) .handler(messageHandler()) 。得到(); } @豆 公共 MessageChannel sftpMgetInboundChannel(){ 返回新的 DirectChannel(); } @豆 公共消息处理程序消息处理程序(){ 返回新的 MessageHandler() { ... } }
使用此更新的代码,我收到以下错误:
rg.springframework.core.NestedIOException:读取文件失败;嵌套异常是 2:没有这样的文件 在 org.springframework.integration.sftp.session.SftpSession.read(SftpSession.java:100) 在 org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.read(CachingSessionFactory.java:137) 在 org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.copyFileToLocalDirectory(AbstractInboundFileSynchronizer.java:176) 在 org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:138) 在 org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.receive(AbstractInboundFileSynchronizingMessageSource.java:144) 在 org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:89) 在 org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146) 在 org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144) 在 org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:207) 在 org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) 在 org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48) 在 org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49) 在 org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202) 在 org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:51) 在 org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) 在 java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) 在 java.util.concurrent.FutureTask.run(FutureTask.java:138) 在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98) 在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206) 在 java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 在 java.lang.Thread.run(Thread.java:680) 原因:2:没有这个文件 在 com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2289) 在 com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:1741) 在 com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:1011) 在 com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:986) 在 org.springframework.integration.sftp.session.SftpSession.read(SftpSession.java:96) ... 22 更多