在所描述的场景中,我将使用聚合器(我们称之为“AggregationRoute”)将拆分的消息发送到一个路由,它的聚合策略实现了 PreCompletionAwareAggregationStrategy(我猜你已经在使用它的方式)。然后,当拆分结束时,将 AGGREGATION_COMPLETE_ALL_GROUPS 标头设置为 true 并将其发送到 AggregationRoute。此交换将仅用作完成所有聚合组的信号。
例子:
...
.split(body()).streaming()
.to("direct:aggregationRoute")
.end()
.setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS,constant(true))
.to("direct:aggregationRoute");
from("direct:aggregationRoute")
.aggregate([your correlation expression]), myAggregationStrategy)
...
另一种选择是使用 AggregateController 通过调用其方法 forceCompletionOfAllGroups() 来结束所有组的聚合:
AggregateController aggregateController = new DefaultAggregateController();
from(...)
...
.split(body()).streaming()
.aggregate([correlation expression], aggregationStrategy).aggregateController(aggregateController)
...
// Do what you need to do with the aggregated exchange
...
.end()
.end()
.bean(aggregateController, "forceCompletionOfAllGroups")