作为问题的后续行动 -
在 spring-ftp 中一次又一次地获取相同的文件,但名称不同
我的 application.xml 中有以下配置
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:file="http://www.springframework.org/schema/integration/file"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/stream
http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
http://www.springframework.org/schema/integration/ftp
http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<int:poller id="poller" task-executor="synchTaskExecutor" default="true" fixed-delay="1000" />
<beans:bean id="ftpClientFactory"
class="com.everge.springframework.integration.ftp.session.EvergeFtpSessionFactory">
<beans:property name="host" value="111.93.128.170"/>
<beans:property name="port" value="21"/>
<beans:property name="username" value="singha"/>
<beans:property name="password" value="singha16"/>
<beans:property name="clientMode" value="2"></beans:property>
</beans:bean>
<beans:bean id="ftpOutClientFactory"
class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
<beans:property name="host" value="111.93.128.170"/>
<beans:property name="port" value="21"/>
<beans:property name="username" value="singha"/>
<beans:property name="password" value="singha16"/>
<beans:property name="clientMode" value="2"></beans:property>
</beans:bean>
<beans:bean id="synchTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<beans:property name="corePoolSize" value="1"></beans:property>
<beans:property name="maxPoolSize" value="1"></beans:property>
<beans:property name="queueCapacity" value="1"></beans:property>
</beans:bean>
<beans:bean id="pqqFtpClientFactory"
class="com.everge.springframework.integration.ftp.session.PqqEvergeFtpSessionFactory">
<beans:property name="host" value="111.93.128.170"/>
<beans:property name="port" value="21"/>
<beans:property name="username" value="singha"/>
<beans:property name="password" value="singha16"/>
<beans:property name="clientMode" value="2"></beans:property>
</beans:bean>
<int:channel id="ftpChannel">
<int:queue/>
</int:channel>
<beans:bean id="acceptAllFileListFilter" class="com.everge.file.processing.EvergeFileListFilter"/>
<beans:bean id="pqqHandler" class="com.everge.pqq.PqqFileHandler">
<beans:property name="config" ref="baseConfig"></beans:property>
</beans:bean>
<beans:bean id="handler" scope="prototype" class="com.everge.integration.client.FileHandler">
<beans:property name="config" ref="baseConfig"></beans:property>
</beans:bean>
<beans:bean id="baseConfig" class="com.everge.config.BaseConfig" />
<beans:bean id="ftpSplitter" class="com.everge.service.FtpSplitter" />
<beans:bean id="fileSplitter" class="com.everge.file.processing.FileSplitter" />
<int-ftp:outbound-channel-adapter id="notifFtpOutBound"
channel="pl"
remote-directory="/ADPWG/PRCSD1"
session-factory="ftpOutClientFactory" auto-startup="true">
<int-ftp:request-handler-advice-chain>
<int:retry-advice />
</int-ftp:request-handler-advice-chain>
</int-ftp:outbound-channel-adapter>
<int-ftp:outbound-channel-adapter id="ftpOutbound"
channel="pqqOutputFileChannel"
remote-directory="/ADPWG/PRCSD"
session-factory="ftpOutClientFactory" auto-startup="true">
<int-ftp:request-handler-advice-chain>
<int:retry-advice />
</int-ftp:request-handler-advice-chain>
</int-ftp:outbound-channel-adapter>
<file:inbound-channel-adapter prevent-duplicates="false" id="filesIn1" directory="file:/Users/abhisheksingh/ddrive/everge_ws/f" auto-startup="true">
<int:poller id="poller" task-executor="synchTaskExecutor" fixed-delay="1000"></int:poller>
</file:inbound-channel-adapter>
<int:service-activator input-channel="filesIn1" ref="handler" />
<file:inbound-channel-adapter prevent-duplicates="false" id="pqqInputFileChannel" directory="file:/Users/abhisheksingh/ddrive/everge_ws/pqqReq" auto-startup="true">
<int:poller id="poller" task-executor="synchTaskExecutor" fixed-delay="1000" />
</file:inbound-channel-adapter>
<int:service-activator input-channel="pqqInputFileChannel" ref="pqqHandler" />
<file:inbound-channel-adapter id="pqqOutputFileChannel" directory="/Users/abhisheksingh/ddrive/everge_ws/pqqResp">
<int:poller id="poller" task-executor="synchTaskExecutor" fixed-delay="10000" />
</file:inbound-channel-adapter>
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpClientFactory"
auto-create-local-directory="true"
delete-remote-files="false"
local-filter="acceptAllFileListFilter"
local-directory="file:/Users/abhisheksingh/ddrive/everge_ws/f" auto-startup="true" >
<int:poller id="poller" task-executor="synchTaskExecutor" fixed-delay="1000" />
</int-ftp:inbound-channel-adapter>
<int-ftp:inbound-channel-adapter id="pqqFtpInbound"
channel="ftpChannel"
session-factory="pqqFtpClientFactory"
auto-create-local-directory="true"
delete-remote-files="false"
local-filter="acceptAllFileListFilter"
local-directory="file:/Users/abhisheksingh/ddrive/everge_ws/pqqReq" auto-startup="true" >
<int:poller id="poller" task-executor="synchTaskExecutor" fixed-delay="1000" />
</int-ftp:inbound-channel-adapter>
<file:inbound-channel-adapter id="pl" directory="file:/Users/abhisheksingh/ddrive/everge_ws/notifFile" auto-startup="true">
<int:poller id="poller" task-executor="synchTaskExecutor" fixed-delay="1000" />
</file:inbound-channel-adapter>
</beans:beans>
所以有一个我轮询的 ftp 位置,轮询的文件被放置在我本地机器上的以下目录中-
/用户/abhisheksingh/ddrive/test/f
现在有时因为我发现了一个错误并且必须修复它,我停止了 tomcat 服务器。我从本地删除文件,以便下次启动服务器时,可以再次轮询相同的文件。但我发现同一个文件再次被轮询。我的服务器停止了!据我所知,这不应该发生。这就是为什么我发布了我的 application.xml 以了解这里是否有一些东西使线程挂起。还是 spring-integration-ftp 启动了一个不依赖于应用程序的守护线程。请让我解开这个谜。
我在 tomcat 日志中看到以下异常 -
Feb 17, 2017 11:49:24 PM org.apache.catalina.loader.WebappClassLoaderBase loadClass
INFO: Illegal access: this web application instance has been stopped already. Could not load UNIX Type: L8. The eventual following stack trace is caused by an error thrown for debugging purposes as well as to attempt to terminate the thread which caused the illegal access, and has no functional impact.
java.lang.IllegalStateException
at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1777)
at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1735)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.commons.net.ftp.parser.DefaultFTPFileEntryParserFactory.createFileEntryParser(DefaultFTPFileEntryParserFactory.java:88)
at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:2263)
at org.apache.commons.net.ftp.FTPClient.listFiles(FTPClient.java:2046)
at org.springframework.integration.ftp.session.FtpSession.list(FtpSession.java:70)
at org.springframework.integration.ftp.session.FtpSession.list(FtpSession.java:43)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer$1.doInSession(AbstractInboundFileSynchronizer.java:236)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer$1.doInSession(AbstractInboundFileSynchronizer.java:232)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:435)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:232)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:193)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:59)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:134)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
在这一点上,我非常确定 spring-integration ftp poller 不会让 tomcat 正确停止。
我确实使用 jvisualvm 进行了更多调查。我看到有启动的任务调度程序线程不会关闭 ./shutdown.sh 调用以停止 tomcat。
有趣的是,如果我删除 pl 或 pqqOutputFileChannel 适配器中的任何一个,这些调度程序就不会被创建。
这是我的耳朵文件的结构 -
以下是即使在 tomcat 上调用了关闭,日志文件也会继续滚动的日志 -
541576 调试 osicPublishSubscribeChannel - 在通道 'errorChannel' 上发布 (sent=true),消息:ErrorMessage [payload=org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6d6033da[正在运行,池大小 = 1 , 活动线程 = 0, 排队任务 = 0, 已完成任务 = 1114]] 不接受任务:org.springframework.integration.util.ErrorHandlingTaskExecutor$1@570e117d, headers={id=71d77a8b-17ea-7011-3cb3-ddbf7591321f,时间戳 = 1487870801603}] 541575 调试 osicPublishSubscribeChannel - 在通道“errorChannel”上发布(发送 = true),消息:ErrorMessage [payload = org.springframework.core.task.TaskRejectedException:执行程序 [java.util.concurrent.ThreadPoolExecutor@6d6033da [Running ,池大小 = 1,活动线程 = 0,排队任务 = 1,已完成的任务 = 1113]] 不接受任务:org.springframework.integration.util.ErrorHandlingTaskExecutor$1@44210f79, headers={id=74a26d65-4b42-da1f-cdf1-b77fea8bfdb7, timestamp=1487870801603}] 541576 错误 osihandler.LoggingHandler - org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6d6033da[Running, pool size = 1, active threads = 0, queued tasks = 1, completed tasks = 1113]] 不接受任务: org.springframework.integration.util.ErrorHandlingTaskExecutor$1@7d7cc2a4 at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:296) at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)在 org.springframework.integration.endpoint。AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201( ScheduledThreadPoolExecutor.java:180) 在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor $Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) 引起:java.util.concurrent.RejectedExecutionException:任务 org.springframework.integration.util.ErrorHandlingTaskExecutor$1@7d7cc2a4 从 java.util.concurrent 被拒绝.ThreadPoolExecutor@6d6033da[正在运行,池大小 = 1,活动线程 = 0,排队任务 = 1,已完成任务 = 1113] 在 java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 在 java.util .concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293)。 .. 还有 11 个在java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 在 java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) 在 java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java: 1369) 在 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) ... 11 更多在java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 在 java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) 在 java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java: 1369) 在 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) ... 11 更多池大小 = 1,活动线程 = 0,排队任务 = 1,已完成任务 = 1113] 在 java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 在 java.util.concurrent.ThreadPoolExecutor.reject( ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) ... 还有 11 个池大小 = 1,活动线程 = 0,排队任务 = 1,已完成任务 = 1113] 在 java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 在 java.util.concurrent.ThreadPoolExecutor.reject( ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) ... 还有 11 个springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) ... 11 更多springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) ... 11 更多
PQQ 541778 调试 osifiFtpInboundFileSynchronizer - 1 个文件已传输 541779 信息 cefpEvergeFileListFilter - 检查文件是否已被处理 /Users/abhisheksingh/ddrive/everge_ws/pqqArchive/TEST4.PQQ 541779 信息 cefpEvergeFileListFilter - 存档文件名是 541779 信息 cefpEverge 是输入文件名TEST4 541779 INFO cefpEvergeFileListFilter - 存档文件名为 TEST4 541779 INFO cefpEvergeFileListFilter - 输入文件名为 TEST4 541779 INFO cefpEvergeFileListFilter - 文件已被处理 TEST4 541779 错误 cefpEvergeFileListFilter - PQQ 文件已被处理。541779 调试 osieSourcePollingChannelAdapter - 在轮询期间未收到任何消息,返回“假” 541974 调试 osie