我有一个使用文件夹中的 XML 文件的批处理路由。它过滤、转换并最终将分组文档保存到磁盘。由于这是一个批处理路由,我要求在对源文件夹进行一次轮询后将其关闭,这就是下面代码中 RouteTerminator 的用途。(它用.调用stopRoute()
并removeRoute()
继续。)camelContext
routeID
from("file:" + sourcePath)
.filter().xquery("//DateTime > xs:dateTime('2013-05-07T23:59:59')")
.filter().xquery("//DateTime < xs:dateTime('2013-05-09T00:00:00')")
.aggregate(constant(true))
.completionFromBatchConsumer()
.groupExchanges()
.to("xquery:" + xqueryPath)
.to("file:" + targetPath)
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
new RouteTerminator(routeID, exchange.getContext()).start();
}
})
.end();
这会在单个文件收集后正确关闭路由,并且在其中重复该过程后onException
,也会在引发异常时优雅地关闭路由。不幸的是,如果路由过滤掉每个 Exchange,它就永远不会到达处理器。相反,在过滤器期间交换被丢弃,并且路由保持打开状态。
我想在aggregate
调用中移动过滤器,因为这可能会使路由一直持续到最后,但是这种方法不会接受 XQuery 过滤器。XPath 不是一个选项,因为它不支持 dateTime 比较。
在这种情况下,如何强制整个路线停止?