我正在尝试设置从多个 ftp 服务器和目录获取新文件以进行后处理。目前,仅连接集合中的第一台服务器。告诉我我做错了什么。
通过我的实现,我看到没有转换到第二台服务器并且出现错误: 1.LoggingHandler - org.springframework.messaging.MessagingException: nested exception is java.io.UncheckedIOException: IOException when retrieving
FtpConfiguration.java
@Log4j2
@Configuration
@PropertySource("classpath:/ftp.properties")
public class FtpConfiguration {
@Value("${sftp.name}")
private String[] names;
@Value("${sftp.host}")
private String[] hosts;
@Value("${sftp.user}")
private String[] users;
@Value("${sftp.pwd}")
private String[] pwds;
private final MessageService messageService;
@Autowired
public FtpConfiguration(MessageService messageService) {
this.messageService = messageService;
}
@Bean
public DelegatingSessionFactory<FTPFile> delegatingSessionFactory() {
Map<Object, SessionFactory<FTPFile>> factories = new LinkedHashMap<>();
for (int i = 0; i < this.names.length; i++) {
DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
factory.setHost(this.hosts[i]);
factory.setUsername(this.users[i]);
factory.setPassword(this.pwds[i]);
factory.setClientMode(2);
factories.put(this.names[i], factory);
}
return new DelegatingSessionFactory<>(factories, factories.values().iterator().next());
}
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory(this.names[0], "/test"));
keyDirectories.add(new RotationPolicy.KeyDirectory(this.names[1], "/prod"));
return new RotatingServerAdvice(delegatingSessionFactory(), keyDirectories, true);
}
@Bean
public IntegrationFlow ftpIntegrationFlow() {
return IntegrationFlows.from(
Ftp.inboundStreamingAdapter(template())
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(500).advice(advice())))
.transform(new StreamTransformer("UTF-8"))
.handle(message -> {
log.info("Read file: {}", message.getHeaders()
.get("file_remoteDirectory" + "/" + message.getHeaders()
.get("file_remoteFile")));
messageService.unmarshall(message);
})
.get();
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(delegatingSessionFactory());
}
}
ftp.properties
sftp.name=test,prod
sftp.host=10.10.10.11,10.10.10.12
sftp.user=user1,user2
sftp.pwd=pass1,pass2
轮询第一个服务器,获取文件,转到第二个服务器,然后发生错误。如果我更改服务器位置,那么它也会从服务器接收一个文件,它会转到另一个错误,它会转到第一个,它成功获取它。当我最终移动到另一台服务器时,我想从一台服务器上获取所有文件,我可以这样做吗?
堆栈跟踪
[DEBUG] 2021-03-17 23:07:23.352 [task-scheduler-1] DefaultFtpSessionFactory - Connected to server [10.10.10.12:21]
[DEBUG] 2021-03-17 23:07:42.592 [HikariPool-1 housekeeper] HikariPool - HikariPool-1 - Pool stats (total=10, active=0, idle=10, waiting=0)
[DEBUG] 2021-03-17 23:07:42.592 [HikariPool-1 housekeeper] HikariPool - HikariPool-1 - Fill pool skipped, pool is at sufficient level.
[DEBUG] 2021-03-17 23:08:10.543 [task-scheduler-1] FtpSession - failed to disconnect FTPClient
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:186) ~[?:?]
at java.net.SocketInputStream.read(SocketInputStream.java:140) ~[?:?]
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) ~[?:?]
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) ~[?:?]
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) ~[?:?]
at java.io.InputStreamReader.read(InputStreamReader.java:185) ~[?:?]
at java.io.BufferedReader.fill(BufferedReader.java:161) ~[?:?]
at java.io.BufferedReader.read(BufferedReader.java:182) ~[?:?]
at org.apache.commons.net.io.CRLFLineReader.readLine(CRLFLineReader.java:58) ~[commons-net-3.7.jar:3.7]
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:320) ~[commons-net-3.7.jar:3.7]
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:299) ~[commons-net-3.7.jar:3.7]
at org.apache.commons.net.ftp.FTP.getReply(FTP.java:731) ~[commons-net-3.7.jar:3.7]
at org.apache.commons.net.ftp.FTPClient.completePendingCommand(FTPClient.java:1861) ~[commons-net-3.7.jar:3.7]
at org.springframework.integration.ftp.session.FtpSession.finalizeRaw(FtpSession.java:114) ~[spring-integration-ftp-5.4.2.jar:5.4.2]
at org.springframework.integration.ftp.session.FtpSession.close(FtpSession.java:155) ~[spring-integration-ftp-5.4.2.jar:5.4.2]
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:230) ~[spring-integration-file-5.4.2.jar:5.4.2]
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:210) ~[spring-integration-file-5.4.2.jar:5.4.2]
at org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:45) ~[spring-integration-core-5.4.2.jar:5.4.2]
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142) ~[spring-integration-core-5.4.2.jar:5.4.2]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.2.jar:5.3.2]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.2.jar:5.3.2]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.2.jar:5.3.2]
at org.springframework.integration.aop.ReceiveMessageAdvice.invoke(ReceiveMessageAdvice.java:56) ~[spring-integration-core-5.4.2.jar:5.4.2]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.2.jar:5.3.2]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.2.jar:5.3.2]
at com.sun.proxy.$Proxy125.receive(Unknown Source) ~[?:?]
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212) ~[spring-integration-core-5.4.2.jar:5.4.2]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:408) ~[spring-integration-core-5.4.2.jar:5.4.2]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.2.jar:5.3.2]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) ~[spring-aop-5.3.2.jar:5.3.2]
at com.sun.proxy.$Proxy124.call(Unknown Source) ~[?:?]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:377) ~[spring-integration-core-5.4.2.jar:5.4.2]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$3(AbstractPollingEndpoint.java:324) ~[spring-integration-core-5.4.2.jar:5.4.2]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) ~[spring-integration-core-5.4.2.jar:5.4.2]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.3.2.jar:5.3.2]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) ~[spring-integration-core-5.4.2.jar:5.4.2]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:321) ~[spring-integration-core-5.4.2.jar:5.4.2]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.3.2.jar:5.3.2]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95) [spring-context-5.3.2.jar:5.3.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
[INFO ] 2021-03-17 23:08:10.554 [task-scheduler-1] FtpStreamingMessageSource - Removing the remote file 'FileInfo [isDirectory=false, isLink=false, Size=4792, ModifiedTime=Tue Mar 09 00:24:00 MSK 2021, Filename=XX.xml, RemoteDirectory=/test, Permissions=----------]' from the filter for a subsequent transfer attempt
[DEBUG] 2021-03-17 23:08:10.555 [task-scheduler-1] PublishSubscribeChannel - preSend on channel 'bean 'errorChannel'', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: nested exception is java.io.UncheckedIOException: IOException when retrieving /test/XX.xml, headers={id=37cd10d7-41fd-0ac8-2630-14fd8c3a3cd5, timestamp=1616011690555}]
[DEBUG] 2021-03-17 23:08:10.555 [task-scheduler-1] LoggingHandler - bean '_org.springframework.integration.errorLogger.handler' for component '_org.springframework.integration.errorLogger' received message: ErrorMessage [payload=org.springframework.messaging.MessagingException: nested exception is java.io.UncheckedIOException: IOException when retrieving /test/XX.xml, headers={id=37cd10d7-41fd-0ac8-2630-14fd8c3a3cd5, timestamp=1616011690555}]
[ERROR] 2021-03-17 23:08:10.556 [task-scheduler-1] LoggingHandler - org.springframework.messaging.MessagingException: nested exception is java.io.UncheckedIOException: IOException when retrieving /test/XX.xml
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:391)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$3(AbstractPollingEndpoint.java:324)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:321)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.UncheckedIOException: IOException when retrieving /test/XX.xml
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:231)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:210)
at org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:45)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.integration.aop.ReceiveMessageAdvice.invoke(ReceiveMessageAdvice.java:56)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at com.sun.proxy.$Proxy125.receive(Unknown Source)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:408)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208)
at com.sun.proxy.$Proxy124.call(Unknown Source)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:377)
... 13 more
Caused by: java.io.IOException: Failed to obtain InputStream for remote file /test/XX.xml: 550
at org.springframework.integration.ftp.session.FtpSession.readRaw(FtpSession.java:104)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:221)
... 37 more
[DEBUG] 2021-03-17 23:08:10.557 [task-scheduler-1] PublishSubscribeChannel - postSend (sent=true) on channel 'bean 'errorChannel'', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: nested exception is java.io.UncheckedIOException: IOException when retrieving /test/XX.xml, headers={id=37cd10d7-41fd-0ac8-2630-14fd8c3a3cd5, timestamp=1616011690555}]
[DEBUG] 2021-03-17 23:08:11.075 [task-scheduler-2] DefaultFtpSessionFactory - Connected to server [10.10.10.11:21]
[DEBUG] 2021-03-17 23:08:11.201 [task-scheduler-2] SourcePollingChannelAdapter - Poll resulted in Message: GenericMessage [payload=org.apache.commons.net.io.SocketInputStream@77de7f0a,