2

我对 karaf 有一个棘手的问题,并且尝试了一整天来解决它,我需要你的见解。这是问题所在:

我有骆驼路由(纯 java DSL),它从 2 个源获取数据,处理它们,然后将结果发送到 redis - 当用作独立应用程序时(使用 Main 类和命令行“java -jar myjar.jar” ),在不到 20 分钟的时间内处理和保存数据 - 当将它们作为捆绑使用时(实际上是另一个功能的一部分),在同一台机器上,大约需要 10 小时

编辑:我忘了补充:我使用骆驼 2.1.0 和 karaf 2.3.2

现在,我们正在将我们的 SI 重构为 karaf 功能,所以遗憾的是,仅保留独立应用程序是不可能的。

我尝试使用 karaf java 内存选项,使用集群(我失败:d)玩 SEDA 和线程池,用 seda 替换所有直接路由,但没有成功。dev:create-dump 显示了很多

thread #38 - Split" Id=166 BLOCKED on java.lang.Class@56d1396f owned by "Camel (camelRedisProvisioning)

karaf 中的 split 和 parallelProcessing 是否存在问题?独立应用程序确实显示了更多的 CPU 活动。

这是我的骆驼路线

//start with a quartz and a cron tab
from("quartz://provisioning/topOffersStart?cron=" + cronValue.replace(' ',  '+')).multicast()
        .parallelProcessing().to("direct:prodDAO", "direct:thesaurus");

//get from two sources and process
from("direct:prodDAO").bean(ProductsDAO.class)
.setHeader("_type", constant(TopExport.PRODUCT_TOP))
.setHeader("topOffer", constant("topOffer"))
.to("direct:topOffers");

from("direct:thesaurus")
.to(thesaurusUri).unmarshal(csv).bean(ThesaurusConverter.class, "convert")
.setHeader("_type", constant(TopExport.CATEGORY_TOP))
.setHeader("topOffer", constant("topOffer"))
.to("direct:topOffers");


//processing  
from("direct:topOffers").choice()
        .when(isCategory)
            .to("direct:topOffersThesaurus")
        .otherwise()
            .when(isProduct)
                .to("direct:topOffersProducts")
            .otherwise()
                .log(LoggingLevel.ERROR, "${header[_type]} is not valid !")
            .endChoice()
        .endChoice()
    .end();

from("direct:topOffersThesaurus")
//here is where I think the problem comes
        .split(body()).parallelProcessing().streaming()
        .bean(someprocessing)
        .to("direct:toRedis");

from("direct:topOffersProducts")
//here is where I think the problem comes
        .split(body()).parallelProcessing().streaming()
        .bean(someprocessing)
        .to("direct:toRedis");

//save into redis
from("direct:toRedis")
        .setHeader("CamelRedis.Key", simple("provisioning:${header[_topID]}"))
        .setHeader("CamelRedis.Command", constant("SETEX"))
        .setHeader("CamelRedis.Timeout", constant("90000"))//25h
        .setHeader("CamelRedis.Value", simple("${body}"))
.to("spring-redis://?redisTemplate=#provisioningRedisTemplateStringSerializer");

注意:发送到 direct:topOffers[products|thesaurus] 的正文是 pojo 的列表(同一类)

感谢任何可以提供帮助的人

编辑:我想我把它缩小到 jaxb 的僵局。事实上,在我的路线中,我多次调用调用 Web 服务的 java 客户端。使用 karaf 时,线程被阻塞: java.lang.Thread.State: BLOCKED (on object monitor) at com.sun.xml.bind.v2.runtime.reflect.opt.AccessorInjector.prepare(AccessorInjector.java:78)

进一步向下堆栈跟踪,我们看到用于转换对象中的 xml 的解组方法,我们怀疑我怀疑的那 2 行

final JAXBContext context = JAXBContext.newInstance(clazz.getPackage().getName());
final Unmarshaller um = context.createUnmarshaller();

我删除了最终的,没有改进。也许与 karaf 使用的 jaxb 有关?我没有在捆绑包中安装任何 jaxb impl

4

1 回答 1

0

搞定了 !如上所示,它确实与我的 web 服务客户端中 jaxb 上下文的死锁有关。我做了什么: - 通过删除 Marshaller/Unmarshaller 对象上的 final 关键字来重构该客户端的旧代码(我认为死锁来自那里,即使它在独立运行时是完全相同的代码) - 实例化基于上下文包装上,而且只有一次。我必须承认 OSGI 的 Classloader 问题让我在桌子上敲了几个小时,但是感谢为什么 JAXB 在 Apache Felix 中运行时找不到我的 jaxb.in​​dex?,我设法解决了这个问题

当然,我的线程现在正在休眠而不是阻塞,但现在我在不到 30 分钟的时间内处理了我的数据,所以这对我来说已经足够了

于 2013-08-09T13:26:15.330 回答