1

我有一个场景,一条消息可以生成多条消息以独立于原始消息进行处理。

我已经尝试过拆分器 EIP,但它看起来有一个默认的聚合策略(我似乎无法关闭)。

我可以实现这条路线:

from("direct:in").to("bean:multipleMsgGenerator").to("direct:out")

在哪里 multipleMsgGenerator 可以向“direct:out”发送 n 条独立消息?

谢谢!

4

2 回答 2

0

默认情况下,拆分器返回原始消息(camel_version >= 2.3)。请参阅http://camel.apache.org/splitter.html。也就是说,您可以对溢出块内的各个拆分消息执行任何您想做的事情。

例如,如果消息的正文是 List,您可以这样做:

from ("direct:in)
  .split(body())
    .to("direct:processIndividualMsg")
  .end()
  .to("direct:doSomethingWithTheOriginalMsg");
于 2013-10-18T08:16:58.687 回答
0

对于任何发现此问题的人,我们找到了适合我们的解决方案。
我们收到一条需要转发多条消息的消息。这可以使用以下方法完成: 1. 一个自定义处理器,它创建一个 List 并将其设置到交换的主体中
2. 在自定义处理器之后的 split().body()
3. Camel Split 逻辑获取 List 并注意到它是项目列表,因此遍历它以获取要转发的消息
(这是在 Camel 中由 org.apache.camel.processor.Splitter -> SplitterIterable 完成的,它调用 ObjectHelper.createIterator(value) )

一些示例代码:

路线:

from("direct:myRouteEntrypoint")
.to("myProducer")
.split().body()
.to("log:logProducedMessages");

定制生产者:

/**
 * MyProducer
 * 
 * Produces a List of String objects and sets that List into the body of
 * the message, to then be later "split"
 */
public class MyProducer implements Processor {
    public MyProducer() {
    }

    public void process(final Exchange inExchange) {

        List<String> ongoingMessages = new ArrayList<>();

        // some loop for each message
        for (int i = 0; i < 5; i++) {
            String body = "Output message from MyProducer, this is output message: " + i;
            body += ", original message body: " + inExchange.getIn().getBody();
            ongoingMessages.add(body);
        }

        inExchange.getIn().setBody(ongoingMessages);
    }
}

您可以看到每个“正在进行的消息”都可以访问发起它的原始消息。如果您运行此代码,并向路由发送一条消息,您将看到 5 条正在进行的消息正在创建并发送到“logProducedMessages”记录器。

于 2017-05-25T10:43:42.647 回答