3

我写了一小块骆驼来消费一个ftp服务器。

但是在运行一段时间后,它会抛出异常,继续运行但不再消耗任何东西。此外,当我再次启动它并且有大量文件等待消耗时,它会再次崩溃。我已经添加了一个异常处理程序,但似乎没有捕获异常。

这是我收到的例外:

Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - File operation failed: 150 Opening ASCII mode data connection for 2386442.XML(3895 bytes).
Accept timed out. Code: 150]
org.apache.camel.component.file.GenericFileOperationFailedException: File operation failed: 150 Opening ASCII mode data connection for 2386442.XML(3895 bytes).
Accept timed out. Code: 150
    at org.apache.camel.component.file.remote.FtpOperations.retrieveFileToStreamInBody(FtpOperations.java:336)
    at org.apache.camel.component.file.remote.FtpOperations.retrieveFile(FtpOperations.java:297)
    at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:333)
    at org.apache.camel.component.file.remote.RemoteFileConsumer.processExchange(RemoteFileConsumer.java:94)
    at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:175)
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:136)
    at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:140)
    at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:92)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)
Caused by: java.net.SocketTimeoutException: Accept timed out
    at java.net.PlainSocketImpl.socketAccept(Native Method)
    at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:408)
    at java.net.ServerSocket.implAccept(ServerSocket.java:462)
    at java.net.ServerSocket.accept(ServerSocket.java:430)
    at org.apache.commons.net.ftp.FTPClient._openDataConnection_(FTPClient.java:560)
    at org.apache.commons.net.ftp.FTPClient.retrieveFile(FTPClient.java:1442)
    at org.apache.camel.component.file.remote.FtpOperations.retrieveFileToStreamInBody(FtpOperations.java:328)
    ... 16 more
Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot retrieve file: GenericFile[2386448.XML] from: Endpoint[ftp://1.1.1.1?delay=15000&delete=true&disconnect=true&exclude=((?i).*pdf$)&password=******&username=user]
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot retrieve file: GenericFile[2386448.XML] from:   Endpoint[ftp://1.1.1.1?delay=15000&delete=true&disconnect=true&exclude=((?i).*pdf$)&password=******&username=user]
   at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:338)
  at org.apache.camel.component.file.remote.RemoteFileConsumer.processExchange(RemoteFileConsumer.java:94)
    at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:175)
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:136)
    at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:140)
    at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:92)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)

这是我使用 Java DSL 制作的路线:

    // XML Predicate
    // only allows names without spaces
    Predicate xmlPredicate = header(RssUtils.CAMEL_FILE_NAME).regex(
            "([\\S]+(\\.(?i)(xml))$)");
    // Images Predicate
    // only allows names without spaces
    Predicate imgPredicate = header(RssUtils.CAMEL_FILE_NAME).regex(
            "([\\S]+(\\.(?i)(jpg|png|gif))$)");

    onException(SchemaValidationException.class).to(
            "file://" + props.getProperty(RssUtils.ROOT_DIR)
                    + "/errors/SchemaValidationException");

    onException(GenericFileOperationFailedException.class).to(
            "file://" + props.getProperty(RssUtils.ROOT_DIR)
                    + "/errors/GenericFileExceptions");

    from(
            "ftp://"
                    + props.getProperty(RssUtils.FTP_URL)
                    + "?username="
                    + props.getProperty(RssUtils.FTP_USER)
                    + "&password="
                    + props.getProperty(RssUtils.FTP_PWD)
                    + "&disconnect=true&delete=true&exclude=((?i).*pdf$)&delay="
                    + props.getProperty(RssUtils.FTP_DELAY))
            .choice()
            .when(xmlPredicate)
            .to("jms:xmlQueue")
    .to("jms:archiveQueue")
            .when(imgPredicate)
            .to("file://" + props.getProperty(RssUtils.ROOT_DIR) + "/img")
            .otherwise()
            .to("file://" + props.getProperty(RssUtils.ROOT_DIR)
                    + "/errors/other");

    from("jms:xmlQueue").to("validator:FtpXmlValidator.xsd")
            .to("xslt://XmlToRssConverter.xsl")
            .process(rssFeedProcessor)
            .to("file://" + props.getProperty(RssUtils.ROOT_DIR) + "/rss/");

from("jms:archiveQueue")
    .to("file://" + props.getProperty(RssUtils.ROOT_DIR) + "/archive/");

我能做些什么来避免这种行为吗?这真的很难测试,所以我希望有人发现我的代码中的缺陷。我现在已经搜索了很长一段时间,但我没有找到任何可靠的东西。也许我可以通过某种方式调试这个问题?

也许有几件事我发现有人可以给出他的强硬:

  • 使用 onException 时使用handled(true)
  • 我可以设置消费者的最大批量大小吗?(我可以为此使用油门吗?)
  • 最后使用显式尝试捕获,因为我使用的是 Java DSL

如果我在这里说错了,请不要开枪,我只是在学习 Camel。因此,如果有人对上面的代码有任何建议,我将不胜感激!

提前非常感谢!

4

2 回答 2

1

您在这里遇到的是一个 FTP 问题,它发生在 Apache Camel 中的事实在很大程度上是无关紧要的。

炸弹的关键部分是:

在 java.net.PlainSocketImpl.socketAccept(Native Method) 在 java.net.PlainSocketImpl.accept(PlainSocketImpl.java:408) 在 java.net.ServerSocket.implAccept(ServerSocket.java:462) 在 java.net.ServerSocket.accept (ServerSocket.java:430) 在 org.apache.commons.net.ftp.FTPClient。openDataConnection (FTPClient.java:560)

org.apache.commons.net.ftp.FTPClient的openDataConnection方法支持主动模式 FTP - 被动模式只使用与命令相同的端口,因此不需要单独的端口连接。

尝试切换到被动模式(对于 Apache Camel,passiveMode = true)。

于 2015-10-23T11:12:58.067 回答
0

从表面上看,不看为什么你的路线会失败,听起来你想要做的是处理并继续——即,处理这个异常并从你离开的地方继续你的路线。根据文档:

从 Camel 2.3 开始可用
在 Camel 2.3 中,我们引入了一个新选项continued,它允许您处理继续在原始路由中路由,就好像没有发生异常一样。

例如,如果抛出 IDontCareException 则忽略并继续,我们可以这样做:

onException(IDontCareException).continued(true);

这里发生的是:

Camel 将捕获异常并且 . . . 忽略它并继续在原始路由中路由。然而 。. . 它将首先路由该 [onException] 路由,然后它将继续在原始路由中路由。

试试这个,它可能会解决你的问题。正如我在上面暗示的那样,根据您的根本问题是什么,这可能更像是一个创可贴,而不是一个适当的解决方案。更好的方法可能是找出 FTP 使用者失败的原因。乍一看,似乎找不到名为2386448.XML.

一旦确定了根本原因,就可以使用 achoice在正确的时间做出不同的行为,如下所示:

.choice()
    .when(isValidFtpResponse())
         .to(DIRECT_CONTINUE_FTP_ROUTE)
    .otherwise()
        .setBody(constant(null))
        .log(ERROR, "FTP failed: ${headers}")
.end()

希望这能给你一些想法并帮助你通过这个问题。

于 2012-09-11T16:37:00.200 回答