我写了一小块骆驼来消费一个ftp服务器。
但是在运行一段时间后,它会抛出异常,继续运行但不再消耗任何东西。此外,当我再次启动它并且有大量文件等待消耗时,它会再次崩溃。我已经添加了一个异常处理程序,但似乎没有捕获异常。
这是我收到的例外:
Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - File operation failed: 150 Opening ASCII mode data connection for 2386442.XML(3895 bytes).
Accept timed out. Code: 150]
org.apache.camel.component.file.GenericFileOperationFailedException: File operation failed: 150 Opening ASCII mode data connection for 2386442.XML(3895 bytes).
Accept timed out. Code: 150
at org.apache.camel.component.file.remote.FtpOperations.retrieveFileToStreamInBody(FtpOperations.java:336)
at org.apache.camel.component.file.remote.FtpOperations.retrieveFile(FtpOperations.java:297)
at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:333)
at org.apache.camel.component.file.remote.RemoteFileConsumer.processExchange(RemoteFileConsumer.java:94)
at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:175)
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:136)
at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:140)
at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:92)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.net.SocketTimeoutException: Accept timed out
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:408)
at java.net.ServerSocket.implAccept(ServerSocket.java:462)
at java.net.ServerSocket.accept(ServerSocket.java:430)
at org.apache.commons.net.ftp.FTPClient._openDataConnection_(FTPClient.java:560)
at org.apache.commons.net.ftp.FTPClient.retrieveFile(FTPClient.java:1442)
at org.apache.camel.component.file.remote.FtpOperations.retrieveFileToStreamInBody(FtpOperations.java:328)
... 16 more
Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot retrieve file: GenericFile[2386448.XML] from: Endpoint[ftp://1.1.1.1?delay=15000&delete=true&disconnect=true&exclude=((?i).*pdf$)&password=******&username=user]
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot retrieve file: GenericFile[2386448.XML] from: Endpoint[ftp://1.1.1.1?delay=15000&delete=true&disconnect=true&exclude=((?i).*pdf$)&password=******&username=user]
at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:338)
at org.apache.camel.component.file.remote.RemoteFileConsumer.processExchange(RemoteFileConsumer.java:94)
at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:175)
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:136)
at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:140)
at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:92)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
这是我使用 Java DSL 制作的路线:
// XML Predicate
// only allows names without spaces
Predicate xmlPredicate = header(RssUtils.CAMEL_FILE_NAME).regex(
"([\\S]+(\\.(?i)(xml))$)");
// Images Predicate
// only allows names without spaces
Predicate imgPredicate = header(RssUtils.CAMEL_FILE_NAME).regex(
"([\\S]+(\\.(?i)(jpg|png|gif))$)");
onException(SchemaValidationException.class).to(
"file://" + props.getProperty(RssUtils.ROOT_DIR)
+ "/errors/SchemaValidationException");
onException(GenericFileOperationFailedException.class).to(
"file://" + props.getProperty(RssUtils.ROOT_DIR)
+ "/errors/GenericFileExceptions");
from(
"ftp://"
+ props.getProperty(RssUtils.FTP_URL)
+ "?username="
+ props.getProperty(RssUtils.FTP_USER)
+ "&password="
+ props.getProperty(RssUtils.FTP_PWD)
+ "&disconnect=true&delete=true&exclude=((?i).*pdf$)&delay="
+ props.getProperty(RssUtils.FTP_DELAY))
.choice()
.when(xmlPredicate)
.to("jms:xmlQueue")
.to("jms:archiveQueue")
.when(imgPredicate)
.to("file://" + props.getProperty(RssUtils.ROOT_DIR) + "/img")
.otherwise()
.to("file://" + props.getProperty(RssUtils.ROOT_DIR)
+ "/errors/other");
from("jms:xmlQueue").to("validator:FtpXmlValidator.xsd")
.to("xslt://XmlToRssConverter.xsl")
.process(rssFeedProcessor)
.to("file://" + props.getProperty(RssUtils.ROOT_DIR) + "/rss/");
from("jms:archiveQueue")
.to("file://" + props.getProperty(RssUtils.ROOT_DIR) + "/archive/");
我能做些什么来避免这种行为吗?这真的很难测试,所以我希望有人发现我的代码中的缺陷。我现在已经搜索了很长一段时间,但我没有找到任何可靠的东西。也许我可以通过某种方式调试这个问题?
也许有几件事我发现有人可以给出他的强硬:
- 使用 onException 时使用handled(true)
- 我可以设置消费者的最大批量大小吗?(我可以为此使用油门吗?)
- 最后使用显式尝试捕获,因为我使用的是 Java DSL
如果我在这里说错了,请不要开枪,我只是在学习 Camel。因此,如果有人对上面的代码有任何建议,我将不胜感激!
提前非常感谢!