我有一个场景,一条消息可以生成多条消息以独立于原始消息进行处理。
我已经尝试过拆分器 EIP,但它看起来有一个默认的聚合策略(我似乎无法关闭)。
我可以实现这条路线:
from("direct:in").to("bean:multipleMsgGenerator").to("direct:out")
在哪里 multipleMsgGenerator 可以向“direct:out”发送 n 条独立消息?
谢谢!
我有一个场景,一条消息可以生成多条消息以独立于原始消息进行处理。
我已经尝试过拆分器 EIP,但它看起来有一个默认的聚合策略(我似乎无法关闭)。
我可以实现这条路线:
from("direct:in").to("bean:multipleMsgGenerator").to("direct:out")
在哪里 multipleMsgGenerator 可以向“direct:out”发送 n 条独立消息?
谢谢!
默认情况下,拆分器返回原始消息(camel_version >= 2.3)。请参阅http://camel.apache.org/splitter.html。也就是说,您可以对溢出块内的各个拆分消息执行任何您想做的事情。
例如,如果消息的正文是 List,您可以这样做:
from ("direct:in)
.split(body())
.to("direct:processIndividualMsg")
.end()
.to("direct:doSomethingWithTheOriginalMsg");
对于任何发现此问题的人,我们找到了适合我们的解决方案。
我们收到一条需要转发多条消息的消息。这可以使用以下方法完成: 1. 一个自定义处理器,它创建一个 List 并将其设置到交换的主体中
2. 在自定义处理器之后的 split().body()
3. Camel Split 逻辑获取 List 并注意到它是项目列表,因此遍历它以获取要转发的消息
(这是在 Camel 中由 org.apache.camel.processor.Splitter -> SplitterIterable 完成的,它调用 ObjectHelper.createIterator(value) )
一些示例代码:
路线:
from("direct:myRouteEntrypoint")
.to("myProducer")
.split().body()
.to("log:logProducedMessages");
定制生产者:
/**
* MyProducer
*
* Produces a List of String objects and sets that List into the body of
* the message, to then be later "split"
*/
public class MyProducer implements Processor {
public MyProducer() {
}
public void process(final Exchange inExchange) {
List<String> ongoingMessages = new ArrayList<>();
// some loop for each message
for (int i = 0; i < 5; i++) {
String body = "Output message from MyProducer, this is output message: " + i;
body += ", original message body: " + inExchange.getIn().getBody();
ongoingMessages.add(body);
}
inExchange.getIn().setBody(ongoingMessages);
}
}
您可以看到每个“正在进行的消息”都可以访问发起它的原始消息。如果您运行此代码,并向路由发送一条消息,您将看到 5 条正在进行的消息正在创建并发送到“logProducedMessages”记录器。