0

我们有目前的情况。我使用 Apache Camel,为小型交换拆分大文件(使用拆分器,见下文)并验证它们。然后我需要聚合消息,但我使用聚合器,它需要设置编译大小或其他。我可以在不设置限制的情况下汇总当前文档中的所有交换吗?

我的路线:

 from("file:data?noop=true?move={{package.success}}&moveFailed={{package.failed}}")
                .transacted()
                .split(ExpressionBuilder.beanExpression(new InvoiceIteratorFactory(), "createIterator"))
                .streaming()
                .process(new ValidatorProcessor())
                .choice()
                .when(new Predicate() {
                    @Override
                    public boolean matches(Exchange exchange) {
                        return exchange.getContext().getProperty(ValidatorProcessor.STATE_PROPERTY).equals(ValidatorProcessor.STATE_SUCCESS);
                    }
                })
                .to("jpa:/...")
                .otherwise()
                .aggregate(body(String.class), new MyAggregationStrategy()).completionSize(????)
                .to("smtps://smtp.gmail.com?username={{remote.e-mail}}&password={{remote.password}}");

要设置聚合器,我用来设置交换次数或时间,但我不知道会有多少次交换。

4

1 回答 1

1

CamelSplitComplete因此,Camel 中的拆分器 EIP每次完成拆分交换时都会生成一个标头。此标头是一个布尔值。

我要做的是completionPredicate()在聚合器中使用而不是completionSize(). 因此,只要该标头为真,它将完成聚合:

from("file:data?noop=true?move={{package.success}}&moveFailed={{package.failed}}")
    .transacted()
    .split(ExpressionBuilder.beanExpression(new InvoiceIteratorFactory(), "createIterator"))
    .streaming()
    .process(new ValidatorProcessor())
    .choice()
    .when(new Predicate() {
                @Override
                public boolean matches(Exchange exchange) {
                    return exchange.getContext().getProperty(ValidatorProcessor.STATE_PROPERTY).equals(ValidatorProcessor.STATE_SUCCESS);
                }
            })
     .to("jpa:/...")
     .otherwise()
     .aggregate(body(String.class), new MyAggregationStrategy()).completionPredicate(header("CamelSplitComplete") == true)
     .to("smtps://smtp.gmail.com?username={{remote.e-mail}}&password={{remote.password}}");

我希望这就是你要找的。

于 2014-05-12T16:17:05.963 回答