0

我有一条设置为以批处理模式运行的路由,轮询数千个 XML 文件。每个都在 XML 结构内加上时间戳,并且这个 dateTime 元素用于确定 XML 是否应该包含在批处理的进一步处理(XQuery 转换)中。由于这是一个批处理路由,它在执行后会自行终止。

因为路由需要自行关闭,我必须确保如果每条消息都被过滤掉,它也会关闭,这就是为什么我不使用过滤器而是使用.choice()语句并在交换器上设置自定义标头,稍后在对匹配进行分组并为 XQuery 准备单个源文档的 bean。

但是,我目前的方法需要.choice()转发的两个分支到的第二条路线。这是必要的,因为我似乎无法强制两条路径简单地继续。所以我的问题是:如何摆脱第二条路线?一种方法是在 bean 中设置过滤器标头,但我担心所涉及的开销。我假设 Camel 中的 XQuery 过滤器将大大优于 POJO,后者从字符串构建 XML 文档并针对它运行 XQuery。

from(sourcePath + "?noop=true" + "&include=.*.xml")
        .choice()
            .when()
                .xquery("[XQuery Filter]")
                .setHeader("Filtered", constant(false))
                .to("direct:continue")
            .otherwise()
                .setHeader("Filtered", constant(true))
                .to("direct:continue")
.end();

from("direct:continue")
        .routeId(forwarderRouteID)
        .aggregate(aggregationExpression)
            .completionFromBatchConsumer()
            .completionTimeout(DEF_COMPLETION_TIMEOUT)
            .groupExchanges()
        .bean(new FastQueryMerger(), "group")
        .to("xquery:" + xqueryPath)
        .bean(new FileModifier(interval), "setFileName")
        .to(targetPath)
        .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    new RouteTerminator(routeID, exchange.getContext()).start();
                    new RouteTerminator(forwarderRouteID, exchange.getContext()).start();
                }
            })
.end();
4

1 回答 1

1

.end() 在这里没有帮助吗?我的意思是:

from(sourcePath + "?noop=true" + "&include=.*.xml")
    .choice()
        .when()
            .xquery("[XQuery Filter]")
            .setHeader("Filtered", constant(false)).end()
        .otherwise()
            .setHeader("Filtered", constant(true)).end()
    .aggregate(aggregationExpression)
        .completionFromBatchConsumer()
        .completionTimeout(DEF_COMPLETION_TIMEOUT)
        .groupExchanges()
    .bean(new FastQueryMerger(), "group")
    .to("xquery:" + xqueryPath)
    .bean(new FileModifier(interval), "setFileName")
    .to(targetPath)
    .process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                new RouteTerminator(routeID, exchange.getContext()).start();
                new RouteTerminator(forwarderRouteID, exchange.getContext()).start();
            }
        });

刚刚快速测试了以下一个并且它有效:

@Produce(uri = "direct:test")
protected ProducerTemplate testProducer;
@EndpointInject(uri = "mock:test-first")
protected MockEndpoint testFirst;
@EndpointInject(uri = "mock:test-therest")
protected MockEndpoint testTheRest;
@EndpointInject(uri = "mock:test-check")
protected MockEndpoint testCheck;

@Test
public void test() {
    final String first = "first";
    final String second = "second";
    testFirst.setExpectedMessageCount(1);
    testTheRest.setExpectedMessageCount(1);
    testCheck.setExpectedMessageCount(2);
    testProducer.sendBody(first);
    testProducer.sendBody(second);
    try {
        testFirst.assertIsSatisfied();
        testTheRest.assertIsSatisfied();
        testCheck.assertIsSatisfied();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

@Override
protected RouteBuilder createRouteBuilder() {
    return new RouteBuilder() {
        public void configure() {
            from("direct:test")
                .choice()
                    .when(body().isEqualTo("first")).to("mock:test-first")
                    .otherwise().to("mock:test-therest").end()
                    .to("mock:test-check");
        }
    };
}
于 2013-05-30T08:50:57.333 回答