1

我对以下路线有一些问题:

// from("cxf:....")...
from("direct:start").process(startRequestProcessor) // STEP 1
            .choice()
                .when(body().isNull())
                        .to("direct:finish")
                .otherwise()
                    .split(body())  // STEP 2
                    .bean(TypeMapper.class) // STEP 3
                    .log("Goes to DynamicRouter:: routeByTypeHeader with header: ${headers.type}")
                    .recipientList().method(Endpoint1DynamicRouter.class, "routeByTypeHeader") // STEP 4
                    .ignoreInvalidEndpoints();

    from("direct:endpoint2") // STEP 6
            .log("Goes to DynamicRouter::routeByCollectionHeader with header: ${headers.collection}")
            .recipientList().method(Endpoint2DynamicRouter.class, "routeByCollectionHeader")
            .ignoreInvalidEndpoints();

    from("direct:endpoint1.1") // STEP 5
            .process(new DateRangeProcessor())
            .to("direct:collections");

    from("direct:endpoint1.2") // STEP 5
            .process(new SingleProcessor())
            .to("direct:collections");


    from("direct:endpoint2.2") // STEP 7
            .aggregate(header("collection" /** endpoint2.2 */), CollectionAggregationStrategy)
            .completionSize(exchangeProperty("endpoint22"))

            .process(new QueryBuilderProcessor())
            .bean(MyService, "getDbCriteria")

            .setHeader("collection", constant("endpoint2.1"))
            .to("direct:endpoint2.1").end();


    from("direct:endpoint2.1") // STEP 8
            .aggregate(header("collection" /** endpoint2.1 */), CollectionAggregationStrategy)
            .completionSize(exchangeProperty("CamelSplitSize"))
            .to("direct:finish").end();

    from("direct:finish")
            .process(new QueryBuilderProcessor())
            .bean(MyRepository, "findAll")
            .log("ResponseData: ${body}").
            marshal().json(JsonLibrary.Gson).end();

路线

  1. 接收 json 字符串并将其转换为 JSONObjects 的列表 (HashSet)。
  2. 将接收到的列表拆分为 json 对象。
  3. 根据对象内容设置对应的headers
  4. 根据标头将消息路由到端点1.1或端点1.2
  5. 将消息转换为mongodb Criteria并发送到endpoint2
  6. Endpoint2 根据另一个标头将消息路由到endpoint2.1 或endpoint2.2。
  7. Endpoint2.2 聚合所有收到的消息,对其进行处理以获取 mongodb Criteria 并将其发送到 endpoint2.1(completionSize 在步骤 2 中计算并保存在属性“endpoint22”中)。
  8. Enpoint2.1 聚合所有消息(CamelSplitSize)将聚合消息转换为 Query 对象并将其发送到 Repository 以检索数据。

我可以在调试器中看到有效的响应对象,但无论如何我得到一个错误:

没有为类 java.util.HashSet 找到消息正文编写器,ContentType: application/json

问题不在响应对象中,因为它与其他路由一起使用并且不包含 HashSet。

我的猜测是路由将 HashSet 创建的 tat STEP 1 发送到输出...

我的问题是:

  • 路由输出有什么问题?
  • 两个收件人列表()都尝试将消息转发到无效端点(我必须使用 .ignoreInvalidEndpoints() 以避免异常):

    org.apache.camel.NoSuchEndpointException: No endpoint could be found for: org.springframework.data.mongodb.core.query.Criteria@20f55e70,请检查您的类路径是否包含所需的 Camel 组件 jar。

任何帮助将非常感激! 谢谢。

4

1 回答 1

2

我觉得很奇怪,但是 .aggregate() 函数不回复交换。它使用你的聚合策略,但总是回复传入的交换。这在阅读文档时并不清楚,但是您必须使用聚合策略以及 split() 才能返回交换。

于 2016-04-17T12:18:34.093 回答