2

我实际上想要一个刷新,或者一个completionSize,但是对于聚合器中的所有聚合。就像一个全局的completionSize。

基本上,我想确保批处理中的每条消息都被聚合,然后在读取最后一条消息时立即完成该聚合器中的所有聚合。

e.g. 1000 messages arrive (the length is not known beforehand)

     aggregate on correlation id into bins
        A 300
        B 400
        C 300   (size of the bins is not known before hand)

     I want the aggregator not to complete until the 1000th exchange is aggregated
     thereupon I want all of the aggregations in the aggregator to complete at once

CompleteSize 适用于每个聚合,而不是聚合器作为一个整体。因此,如果我设置 CompleteSize( 1000 ) 它永远不会完成,因为每个聚合必须超过 1000 才能“完成”

我可以通过构建单个 Map 对象来解决它,但这有点回避 aggregator2 中的相关性,我更喜欢理想地使用它

所以是的,无论是全局完整大小还是刷新,有没有办法智能地做到这一点?

4

3 回答 3

3

一种选择是简单地添加一些逻辑来保持全局计数器并在Exchange.AGGREGATION_COMPLETE_ALL_GROUPS到达时设置标题......

从 Camel 2.9 开始可用...您可以通过发送包含标头 Exchange.AGGREGATION_COMPLETE_ALL_GROUPS 设置为 true 的消息来手动完成所有当前的聚合交换。该消息仅被视为信号消息,否则不会处理消息标题/内容。

于 2013-02-25T16:24:30.497 回答
0

我建议看一下 Camel aggregator eip doc http://camel.apache.org/aggregator2,并了解不同的完成条件。此外,Ben 提到的特殊消息可以发送给信号以完成所有飞行中的聚合。

如果您从批处理消费者http://camel.apache.org/batch-consumer.html消费,那么您可以使用在批处理完成时完成的特殊完成。例如,如果您从 JPA 数据库表中提取文件或行等。然后,当批处理使用者的所有消息都已处理后,聚合器可以使用 completionFromBatchConsumer 选项发出所有这些聚合消息的完成信号。

此外,如果您有 Camel in Action 书籍的副本,请阅读第 8 章第 8.2 节,因为它所有关于聚合 EIP 的内容都涵盖了更多详细信息。

于 2013-02-26T15:51:15.677 回答
0

使用 Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE 为我工作:

from(endpoint)
        .unmarshal(csvFormat)
        .split(body())
        .bean(CsvProcessor())
        .choice()
        // If all messages are processed,
        // flush the aggregation
        .`when`(simple("\${property.CamelSplitComplete}"))
        .setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, constant(true))
        .end()
        .aggregate(simple("\${body.trackingKey}"),
                AggregationStrategies.bean(OrderAggregationStrategy()))
        .completionTimeout(10000)
于 2019-09-07T14:40:46.627 回答