问题标签 [spring-integration-sftp]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring-integration - 会话超时后如何使用 JSch 和 Spring 集成进行 SFTP
我有一个通过 SFTP 文件的应用程序SftpInboundFileSynchronizer
:
会话工厂是:
SftpInboundFileSynchronizingMessageSource
设置为使用复合触发器进行轮询。
轮询器设置有复合触发器。它有一个 cron 触发器来尝试每天一次 SFTP 预期的文件。如果找不到预期的文件,我有一个覆盖 cron 触发器的周期性触发器。它每 15 分钟尝试一次 SFTP 文件(最多 5 次)。在获取文件或尝试 5 次获取文件后,cron 触发器将覆盖定期触发器。包含这个RetryCompoundTriggerAdvice
逻辑。
RetryCompoundTriggerAdvice
问题是当应用程序在 15 分钟后重新尝试 SFTP 时,服务器会话已超时。因此,JSch 报告:
因此,不会收到更多消息,并且应用程序将不会尝试 SFTP 文件,直到下一个 cron 时间(即第二天)。等待 15 分钟后如何重新尝试 SFTP 文件?
我试着打电话setServerAliveInterval
给DefaultSftpSessionFactory
没有任何运气。注意:在检索文件或尝试 5 次后,我想停止与 SFTP 服务器的通信,直到下一次 cron 运行。
更新 我终于能够执行测试。根据要求添加调试日志。注意:在 prop 文件中调试日志设置如下:
这是日志:
即使我设置了调试级别,JSCH 也不会记录调试语句。从日志看来,SFTP 连接已终止。注意:task-scheduler-6 线程每分钟轮询一次,因为另一个轮询器用于文件系统同步。
spring-integration - Spring Integration 中的动态入站/出站配置
考虑使用 Spring Integration 进行动态入站/出站配置。可以有“n”个客户使用我们的系统,每个客户都有自己的入站 FTP / web 服务配置,以将文件拉入我们的系统进行处理。同样,在处理完这些客户中的每一个之后,都可以拥有需要推送报告(最终结果)的出站 FTP/Web 服务配置。spring 集成是否适合这种情况?如果是,您能否建议或指出任何示例。Web 服务包括 REST 和 SOAP。此外,每个客户是否可以有自己的 cron 表达式来决定何时在 Inbound 期间提取文件,而不是按固定间隔轮询?
java - 如何在使用spring集成轮询目录时过滤掉点和点(默认unix inode)
我正在尝试使用 spring 集成来轮询我的一些目录。由于 spring 集成在 KeyValue 表中写入并获取了 this 。以及.. unix inode 并尝试重复写入我的数据库表,它在尝试几秒钟后抛出异常。我想学习一种方法来忽略它们,以便春天开始。我已经在使用 AcceptOnceFileListFilter。此外,我尝试使用自定义过滤器过滤掉文件名长度小于 3 的任何文件,但它们不起作用。
更新:
我正在使用IgnoreHiddenFileListFilter
但它不工作。我已经缩小了这个问题,因为我的自定义过滤器实现FileListFilter < File >
了所有进入这个过滤器的文件都是 java.io.File 对象并且它不能识别.
和..
作为文件。我确认了这一点,因为我的 sftp 远程过滤器实现FileListFilter< LsEntry >
并且它正确过滤掉.
并且..
我可以在日志中看到。但是我不能LsEntry
在本地使用,因为它会引发类型转换异常。
这是我的过滤器配置:
这是 CustomFilter
}
远程自定义过滤器
}
这是我的调试日志的样子:
但这是我的表的 KeyValue 表 视图
java - Spring Integration DSL for SFTP Outbound with delete
我正在使用
- Sprint 集成(文件、SFTP 等)4.3.6
- 春季启动 1.4.3
- Spring 集成 Java DSL 1.1.4
我正在尝试设置一个 SFTP 出站适配器,它允许我将文件移动到远程系统上的目录,并在我的本地系统中删除或重命名该文件。
因此,例如,我想将文件a.txt放在本地目录中,并将其 SFTP 发送到目录inbound中的远程服务器。传输完成后,我希望删除或重命名a.txt的本地副本。
我玩弄了几种方法。所以这里是我常用的 SessionFactory 进行测试。
这是一个转换器,我必须在消息中添加一些标题
然后我有一个集成流,它使用轮询器来监视本地目录并调用它:
这工作正常,但会留下本地文件。关于如何在上传完成后删除本地文件的任何想法?我应该看看SftpOutboundGateway
吗?
提前致谢!
Artem 的回答非常完美!这是一个快速示例,它在推送后删除本地文件。
spring-integration - 在 spring-ftp 中一次又一次地获取相同的文件,但名称不同
我有一个这样定义的弹簧输入通道
文件名示例为 TEST.SQQ。SQQ 是客户端用来将文件放在 ftp 中的文件格式。但是,我看到 spring ftp 适配器一次又一次地使用不同的文件名拾取同一个文件。所以第一次是TEST.SQQ。然后下一次是TEST.SQQ-20170204.PQQ,下一次是TEST.SQQ-20170204.PQQ.20170304.PQQ。这一直在继续。我有一个过滤器,用于检查已处理文件的名称。但由于每次轮询的文件名都不同,因此所有这些文件都会被挑选出来进行处理。
这是我的 ftp 适配器 -
这是我的 ftp 服务器映像 -
这是我的本地目录图像 -
我不明白为什么同一个文件会一次又一次地被拾取。我会感谢一些帮助!
这是我的文件列表过滤器代码。
spring - 即使我的应用程序停止,Spring-integration-ftp 也会轮询文件
作为问题的后续行动 -
在 spring-ftp 中一次又一次地获取相同的文件,但名称不同
我的 application.xml 中有以下配置
所以有一个我轮询的 ftp 位置,轮询的文件被放置在我本地机器上的以下目录中-
/用户/abhisheksingh/ddrive/test/f
现在有时因为我发现了一个错误并且必须修复它,我停止了 tomcat 服务器。我从本地删除文件,以便下次启动服务器时,可以再次轮询相同的文件。但我发现同一个文件再次被轮询。我的服务器停止了!据我所知,这不应该发生。这就是为什么我发布了我的 application.xml 以了解这里是否有一些东西使线程挂起。还是 spring-integration-ftp 启动了一个不依赖于应用程序的守护线程。请让我解开这个谜。
我在 tomcat 日志中看到以下异常 -
在这一点上,我非常确定 spring-integration ftp poller 不会让 tomcat 正确停止。
我确实使用 jvisualvm 进行了更多调查。我看到有启动的任务调度程序线程不会关闭 ./shutdown.sh 调用以停止 tomcat。
有趣的是,如果我删除 pl 或 pqqOutputFileChannel 适配器中的任何一个,这些调度程序就不会被创建。
这是我的耳朵文件的结构 -
以下是即使在 tomcat 上调用了关闭,日志文件也会继续滚动的日志 -
541576 调试 osicPublishSubscribeChannel - 在通道 'errorChannel' 上发布 (sent=true),消息:ErrorMessage [payload=org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6d6033da[正在运行,池大小 = 1 , 活动线程 = 0, 排队任务 = 0, 已完成任务 = 1114]] 不接受任务:org.springframework.integration.util.ErrorHandlingTaskExecutor$1@570e117d, headers={id=71d77a8b-17ea-7011-3cb3-ddbf7591321f,时间戳 = 1487870801603}] 541575 调试 osicPublishSubscribeChannel - 在通道“errorChannel”上发布(发送 = true),消息:ErrorMessage [payload = org.springframework.core.task.TaskRejectedException:执行程序 [java.util.concurrent.ThreadPoolExecutor@6d6033da [Running ,池大小 = 1,活动线程 = 0,排队任务 = 1,已完成的任务 = 1113]] 不接受任务:org.springframework.integration.util.ErrorHandlingTaskExecutor$1@44210f79, headers={id=74a26d65-4b42-da1f-cdf1-b77fea8bfdb7, timestamp=1487870801603}] 541576 错误 osihandler.LoggingHandler - org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6d6033da[Running, pool size = 1, active threads = 0, queued tasks = 1, completed tasks = 1113]] 不接受任务: org.springframework.integration.util.ErrorHandlingTaskExecutor$1@7d7cc2a4 at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:296) at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)在 org.springframework.integration.endpoint。AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201( ScheduledThreadPoolExecutor.java:180) 在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor $Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) 引起:java.util.concurrent.RejectedExecutionException:任务 org.springframework.integration.util.ErrorHandlingTaskExecutor$1@7d7cc2a4 从 java.util.concurrent 被拒绝.ThreadPoolExecutor@6d6033da[正在运行,池大小 = 1,活动线程 = 0,排队任务 = 1,已完成任务 = 1113] 在 java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 在 java.util .concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293)。 .. 还有 11 个在java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 在 java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) 在 java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java: 1369) 在 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) ... 11 更多在java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 在 java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) 在 java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java: 1369) 在 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) ... 11 更多池大小 = 1,活动线程 = 0,排队任务 = 1,已完成任务 = 1113] 在 java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 在 java.util.concurrent.ThreadPoolExecutor.reject( ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) ... 还有 11 个池大小 = 1,活动线程 = 0,排队任务 = 1,已完成任务 = 1113] 在 java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 在 java.util.concurrent.ThreadPoolExecutor.reject( ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) ... 还有 11 个springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) ... 11 更多springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) ... 11 更多
PQQ 541778 调试 osifiFtpInboundFileSynchronizer - 1 个文件已传输 541779 信息 cefpEvergeFileListFilter - 检查文件是否已被处理 /Users/abhisheksingh/ddrive/everge_ws/pqqArchive/TEST4.PQQ 541779 信息 cefpEvergeFileListFilter - 存档文件名是 541779 信息 cefpEverge 是输入文件名TEST4 541779 INFO cefpEvergeFileListFilter - 存档文件名为 TEST4 541779 INFO cefpEvergeFileListFilter - 输入文件名为 TEST4 541779 INFO cefpEvergeFileListFilter - 文件已被处理 TEST4 541779 错误 cefpEvergeFileListFilter - PQQ 文件已被处理。541779 调试 osieSourcePollingChannelAdapter - 在轮询期间未收到任何消息,返回“假” 541974 调试 osie
java - 为什么 SFTP 入站/出站通道适配器有单独的通道声明,为什么简单文件入站/出站通道适配器没有?
我通过Spring Integration File Support教程开发了一个代码,其中我从特定位置轮询文件并进一步处理它们。出于轮询目的,我使用了Spring Integration 的入站和出站通道适配器,因此我的bean.xml与以下内容相同:
现在我的主要课程是:
并且有一个处理文件的处理程序类,这对我来说很好用。
现在的问题是我想为 SFTP 远程服务器创建相同的机制,并从该位置轮询文件,并将处理后的文件放在不同文件夹中的相同 SFTP 位置。
所以我相应地配置了我的 bean.xml 并为 SFTP 编写了入站和出站通道适配器。当我来到 service-activator 部分时,我发现我需要为 Inbound 和 Outbound 通道适配器配置单独的通道,并在 service-activator 中提供它们的 id,我在简单的文件轮询器中没有看到它工作正常。那么为什么入站和出站通道适配器需要单独的通道呢?如果需要它们,我们如何为计划文件轮询服务实现相同的功能?
spring-integration - Spring 与 RedisLockRegistry 的集成示例
我们正在实现一个流程,其中<int-sftp:inbound-streaming-channel-adapter/>
轮询文件的目录,并在找到时将流传递给服务激活器。
问题是我们将运行应用程序的多个实例,我们希望锁定进程,以便只有一个实例可以获取文件。
查看文档,Redis Lock Registry 看起来是解决方案,有没有在 xml 中使用的示例?
我所能找到的只是一些对它的引用和它的源代码。
http://docs.spring.io/spring-integration/reference/html/redis.html点 24.1
添加信息: 我添加了 RedisMetaDataStore 和 SftpSimplePatternFileListFilter。它确实有效,但确实有一个奇怪之处,当轮询器激活 sftpInboundAdapter 时,它会为元数据存储中的每个文件添加一个条目。假设有 10 个文件,数据存储中将有 10 个条目,但它不会处理“1 go”中的所有 10 个文件,适配器每次轮询只处理 1 个文件,这很好,但在多实例中环境 如果拾取文件的服务器在处理 5 个文件后停机,则除非文件被“触摸”,否则另一台服务器似乎不会拾取剩余的 5 个文件。
每次轮询获取 1 个文件的行为是否正确,或者是否应该在一次轮询期间处理所有有效文件。
下面是我的 XML
junit - 用于 SFTP 连接的 Spring junit 测试:com.jcraft.jsch.JSchException:数据包损坏
我一直在我的本地重新创建这个例子:https ://github.com/spring-projects/spring-integration-samples/tree/master/basic/sftp
但这次只使用注释而不是 xml 配置。我有这样的事情:
Sftp普通类:
SftpConfig 类:
在我的测试中,我有这样的事情:
我的 stackTrace 是这个:
org.springframework.messaging.MessagingException: 获取池项失败;嵌套异常是 java.lang.IllegalStateException: failed to create SFTP Session at org.springframework.integration.util.SimplePool.getItem(SimplePool.java:178) at org.springframework.integration.file.remote.session.CachingSessionFactory.getSession( CachingSessionFactory.java:123) 在 com.spring.example.sftp.SftpOutboundTransferSample.testOutbound(SftpOutboundTransferSample.java:62) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.爪哇:
我已经在 Internet 上进行了搜索,但老实说,我不明白为什么会出现此异常:
com.jcraft.jsch.JSchException:数据包损坏
我的 log4j.xml 配置:
[更新]完整 ECLIPSE 的输出(控制台)在这里:https ://gist.github.com/columb1a/fe2d4dccabb6a5d9cecd3225d2a591dc
java - 如何在 Spring Integration 中使用通道但仅使用 java config
我一直在阅读 Internet 上的 Spring 文档和论坛。我做了一些与此非常相似的事情:sftp with java config in spring
关键是我想使用通道发送消息,所以我创建了一个像这样的方法
但我不知道如何获取通道以便将其设置为 MethodToSendMessageUsingSftp 方法并发送消息。
顺便说一句,我做所有这些事情只是因为我在我的项目中将一些东西从 xml 更改为 java config:
所以,我的旧项目如下所示:
如果你看到了,在我的uploadToSftpTasklet
我指的是sftpChannel
, 这就是我想要使用 java config 做的事情。