对于多播+聚合,我有以下奇怪的(或至少我不清楚)行为。考虑以下路线:
from("direct:multicaster")
.multicast()
.to("direct:A", "direct:B")
.aggregationStrategy(new AggregationStrategy() {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
List firstResult = newExchange.getIn().getBody(List.class);
newExchange.getIn().setBody(ImmutableList.copyOf(firstResult));
return newExchange;
} else {
List oldResults = oldExchange.getIn().getBody(List.class);
List newResults = newExchange.getIn().getBody(List.class);
ImmutableList aggResult = ImmutableList.copyOf(Iterables.concat(oldResults, newResults));
oldExchange.getIn().setBody(aggResult);
return oldExchange;
}
}
})
.end()
// .to("log:bla")
本质上,这条路由接受一个输入,将其发送到direct:A
and direct:B
,期望来自这两个端点的列表并将它们连接起来(最后一行中的注释是有原因的,我稍后会解释)。
现在假设这两个端点分别“返回”列表 [A] 和 [B]。如果我将消息发送M
到,则使用anddirect:multicaster
调用一次聚合器,然后使用and (正如它应该做的那样)。oldExchange = null
newExchange.in.body=[A]
oldExchange.in.body=[A]
newExchange.out.body=[B]
到目前为止一切都很好。但是聚合器再次被调用,oldExchange.in.body=[A,B]
并且newExchange.in=M
(M
是初始消息)。这看起来类似于包含的扩充模式。
您可以通过删除最后一行中的注释来获得预期的行为,即只需添加一个 dummy to("log:bla")
。有了这个,一切都按预期运行。
更新:尝试(参见克劳斯提供的提示)
.multicast()
.aggregationStrategy(aggStrategy)
.to("direct:A", "direct:B")
.end()
和
.multicast(aggStrategy)
.to("direct:A", "direct:B")
.end()
两者都导致相同的行为。
这里发生了什么 - 我做错了什么?
在此先感谢马库斯