0

我正在使用 Apache Camel 并获取一个大文件作为输入,我必须逐行处理。内容已经排序,我必须聚合所有具有相同关联键的连续行。如果关联键发生变化,则必须完成先前的聚合。如果文件结束,则最后一个聚合也已完成。我有一些限制: - 因为传入的文件相当大,我们希望以流方式处理它。- 因为将结果提供给同步端点,所以我不想使用超时完成谓词。否则我将失去调节数据源消耗速度的背压,并且交换将累积在 AggregateProcessor 的超时映射和聚合存储库中。

PreCompletionAwareAggregationStrategy 看起来是一个很有前途的解决方案,但事实证明,在下一个文件到达之前,最后一个聚合不会完成。如果我在 preComplete 中使用 CamelSplitComplete 属性,则最后一个聚合完成但没有最后一个传入交换。相反,这最后一个交换将被添加到下一个到达的文件的内容中。

所以目前我很迷失找到一个不太难看的解决方案。

4

2 回答 2

0

好吧,也许一种方法可能是因为您的数据已经排序,所以以流方式解析并将具有相同相关键的每一行添加到某个 hashmap 结构中。一旦遇到新的相关键,您基本上想要“刷新”哈希图以创建新消息,然后重新启动相同的过程。看看这里: http ://camel.apache.org/how-do-i-write-a-custom-processor-which-sends-multiple-messages.html

于 2016-11-12T02:33:22.387 回答
0

在所描述的场景中,我将使用聚合器(我们称之为“AggregationRoute”)将拆分的消息发送到一个路由,它的聚合策略实现了 PreCompletionAwareAggregationStrategy(我猜你已经在使用它的方式)。然后,当拆分结束时,将 AGGREGATION_COMPLETE_ALL_GROUPS 标头设置为 true 并将其发送到 AggregationRoute。此交换将仅用作完成所有聚合组的信号。

例子:


    ...
    .split(body()).streaming()
        .to("direct:aggregationRoute")
    .end()
    .setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS,constant(true))
    .to("direct:aggregationRoute");

from("direct:aggregationRoute")
    .aggregate([your correlation expression]), myAggregationStrategy)
    ...

另一种选择是使用 AggregateController 通过调用其方法 forceCompletionOfAllGroups() 来结束所有组的聚合:


AggregateController aggregateController = new DefaultAggregateController();

from(...)
    ...
    .split(body()).streaming()
        .aggregate([correlation expression], aggregationStrategy).aggregateController(aggregateController)
            ...
            // Do what you need to do with the aggregated exchange
            ...
        .end()
    .end()
    .bean(aggregateController, "forceCompletionOfAllGroups")
于 2016-11-17T22:07:48.357 回答