3

对于多播+聚合,我有以下奇怪的(或至少我不清楚)行为。考虑以下路线:

    from("direct:multicaster")
                .multicast()
                .to("direct:A", "direct:B")
                .aggregationStrategy(new AggregationStrategy() {
                    @Override
                    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                        if (oldExchange == null) {
                            List firstResult = newExchange.getIn().getBody(List.class);
                            newExchange.getIn().setBody(ImmutableList.copyOf(firstResult));
                            return newExchange;
                        } else {
                            List oldResults = oldExchange.getIn().getBody(List.class);
                            List newResults = newExchange.getIn().getBody(List.class);
                            ImmutableList aggResult = ImmutableList.copyOf(Iterables.concat(oldResults, newResults));
                            oldExchange.getIn().setBody(aggResult);
                            return oldExchange;
                        }
                    }
                })
                .end()
//                .to("log:bla")

本质上,这条路由接受一个输入,将其发送到direct:Aand direct:B,期望来自这两个端点的列表并将它们连接起来(最后一行中的注释是有原因的,我稍后会解释)。

现在假设这两个端点分别“返回”列表 [A] 和 [B]。如果我将消息发送M到,则使用anddirect:multicaster调用一次聚合器,然后使用and (正如它应该做的那样)。oldExchange = nullnewExchange.in.body=[A]oldExchange.in.body=[A]newExchange.out.body=[B]

到目前为止一切都很好。但是聚合器再次被调用,oldExchange.in.body=[A,B]并且newExchange.in=M(M是初始消息)。这看起来类似于包含的扩充模式。

您可以通过删除最后一行中的注释来获得预期的行为,即只需添加一个 dummy to("log:bla")。有了这个,一切都按预期运行。

更新:尝试(参见克劳斯提供的提示)

            .multicast()
            .aggregationStrategy(aggStrategy)
            .to("direct:A", "direct:B")
            .end()

            .multicast(aggStrategy)
            .to("direct:A", "direct:B")
            .end()

两者都导致相同的行为。

这里发生了什么 - 我做错了什么?

在此先感谢马库斯

4

3 回答 3

2

我试图重现该问题,但没有成功。这就是我所做的:

路线:

public class MulticastRoute extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        AggregationStrategy myAggregationStrategy = new MyAggregationStrategy();
        List<String> listA = Lists.newArrayList("A");
        List<String> listB = Lists.newArrayList("B");
        from("direct:multicast").routeId("multicastRoute").multicast(myAggregationStrategy).to("direct:A", "direct:B").end();

        from("direct:A").setBody(constant(listA));
        from("direct:B").setBody(constant(listB));
    }

    class MyAggregationStrategy implements AggregationStrategy {
        @Override
        public org.apache.camel.Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            System.out.println("Aggregate called with oldExchange = " + (oldExchange == null ? "null" :
                    oldExchange.getIn().getBody().toString()) + ", newExchange = " +
                    newExchange.getIn().getBody().toString());
            return newExchange;
        }
    }
}

创建一个简单的测试只是为了运行路线。

考试:

public class MulticastRouteTest extends CamelTestSupport {
  @Test
    public void testMulticastRoute() throws Exception {
        context.addRoutes(new MulticastRoute());
        template.sendBody("direct:multicast", null);
    }
}

这打印:

Aggregate called with oldExchange = null, newExchange = [A]
Aggregate called with oldExchange = [A], newExchange = [B]

这是我们所期望的。希望这会帮助你。我看不出我做事的方式有什么不同,但希望你能发现它。

于 2013-10-23T07:45:53.173 回答
0

我有同样的问题。2 件事似乎很关键

  • 将 AggregationStrategy 设置为之前(我会直接将其设置为多播的参数)
  • 用“end()”结束多播

如果您查看每个节点的返回类型,我认为您可以看到差异

于 2016-11-04T11:59:51.983 回答
0

原因是聚合策略是跨不同对象实例的共享代码块。因此,每当创建实例时,差异就会出现。

于 2020-10-08T23:58:05.577 回答