流组件 将为文件的每一行创建一个新的交换。
流组件提供了在 XX 行上拆分的选项(默认为 0):
groupLines 0 Camel 2.5:对消费者中的 X 行进行分组。例如,将 10 行分组,因此只吐出一个 10 行的 Exchange,而不是每行 1 个 Exchange。
此选项用于创建 XX 行的新交换。下面是StreamConsumer的相关代码:
if (lines.size() >= endpoint.getGroupLines()) {
// spit out lines
Exchange exchange = endpoint.createExchange();
// create message with the lines
Message msg = new DefaultMessage();
List<String> copy = new ArrayList<String>(lines);
msg.setBody(endpoint.getGroupStrategy().groupLines(copy));
exchange.setIn(msg);
// clear lines
lines.clear();
getProcessor().process(exchange);
}
每次交换完成后都会执行onCompletion 。
来自骆驼文档(onCompletion 文档):
始终触发,仅在成功完成时触发,或仅在失败时触发
因此,您的代码在读取第一行后停止路由。
显然,实际上没有办法知道文件的结尾何时发生在流组件中。
一个不错的方法可能是在splitter中拥有一个 CamelSplitComplete 属性。我们可以这样使用(参见onCompletion 和 onWhen 谓词章节):
onCompletion()
.onWhen(property("CamelSplitComplete").isEqualTo("true"))
.setBody(simple("")).bean(new Stop(getContext(), transferID));
编辑:我只是在寻找流消费者,但因为你也在使用生产者。查看 closeOnDone 属性,这可能会奏效。
关闭完成 | 假 | Camel 2.11.0:此选项与 Splitter 结合使用并流式传输到同一文件。这个想法是保持流打开并且仅在拆分器完成时关闭,以提高性能。请注意,这要求您只流式传输到同一个文件,而不是 2 个或更多文件。