2

我有以下骆驼路线:

<route id="myRoute">
    <from uri="direct:aggregator" />

    <aggregate strategy="aggregatorStrategy" completionInterval="60000" completionSize="500">
        <correlationExpression>
            <xpath>/fizz/buzz</xpath>
        </correlationExpression>

        <to uri="bean:postProcessor?method=run" />
    </aggregator>
</route>

如您所见,它聚合<aggregator/>接收到的前 500 条消息,或 1 分钟间隔内的所有消息,然后将聚合消息发送到名为postProcessor.

你可以这样想这个聚合逻辑:

AGGREGATE UNTIL:
    We have received 500 messages
    OR
    1 minute has elapsed

THEN:
    Send to postProcessor

或者在伪代码中:aggregateUntil(weHave500Message() || 1minHasElapsed()). 我想将此逻辑更改为:

AGGREGATE UNTIL:
    We have received 500 messages
    OR
    1 minute has elapsed
    OR
    A message is received that has a property called "fireNow" and a value of "true"

THEN:
    Send to postProcessor

或者,再次以伪代码:aggregateUntil(weHave500Message() || 1minHasElapsed() || messageHasProperty("fireNow", "true")).

换句话说,聚合直到满足 3 个条件中的任何一个。有什么想法可以实现吗?我有一种感觉completionPredicate,也许我可以解决这个问题eagerCheckCompletion,但在这里看不到森林。

4

1 回答 1

4

您可以将completionSizeandcompletionInterval一起使用并将 a 添加completionPredicate到等式中,如下所示(您确实需要使用eagerCheckCompletion="true",因为我们需要测试传入消息,而不是聚合消息):

<route>
    <from uri="direct:aggregator" />
    <aggregate completionSize="500" completionInterval="60000" eagerCheckCompletion="true">
        <correlationExpression>
             <xpath>/fizz/buzz</xpath>
        </correlationExpression>
        <completion-predicate>
             <simple>${property.fireNow} == 'true'</simple>
        </completion-predicate>
        <to uri="bean:postProcessor?method=run" />
    </aggregate>
</route>

另一种方法是创建一个复合谓词来测试所有三个条件并仅使用该谓词,completion-predicate但此解决方案较差,因为间隔超时由单独的线程触发,而使用复合谓词则没有线程触发聚合超时到期。

于 2014-02-06T00:28:06.410 回答