5

我有一条 apache 骆驼路线,它正在交换主体上处理 POJO。

请查看从 1 到 3 标记的行序列。

    from("direct:foo")
        .to("direct:doSomething")         // 1 (POJO on the exchange body)
        .to("direct:storeInHazelcast")    // 2 (destroys my pojo! it gets -1)
        .to("direct:doSomethingElse")     // 3 (Where is my POJO??)
    ;

现在我需要put在组件上使用操作hazelcast,不幸的是需要将 body 设置为值 -1。

    from("direct:storeInHazelcast")
            .setBody(constant(-1))
            .setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.PUT_OPERATION))
            .setHeader(HazelcastConstants.OBJECT_ID, constant(LAST_FLIGHT_UPDATE_SEQ))
            .to("hazelcast:map:MyNumber")
    ;

对于标记为 2 的行,我想将交换的副本发送到storeInHazelcast路由。

首先,我试过.multicast()了,但交换体仍然搞砸了(到-1)。

        // shouldnt this copy the exchange?
        .multicast().to("direct:storeInHazelcast").end()

然后我尝试.wireTap()了 ,它作为“即发即弃”(异步)模式工作,但我实际上需要它来阻止,并等待它完成。你可以制作wireTap块吗?

        // this works but I need it to be sync processing (not async)
        .wireTap("direct:storeInHazelcast").end()

所以我在这里寻找一些提示。据我所知,multicast()应该复制了交换,但setBody()在我的storeInHazelcast路线中看到搞砸了原来的交换。

或者,也许还有其他方法可以做到这一点。

提前致谢。骆驼 2.10

4

5 回答 5

6

我想我偶然发现了答案,2可以enrich()像这样从 dsl 使用线路:

    .enrich("direct:storeInHazelcast", new KeepOriginalAggregationStrategy())

在哪里:

public class KeepOriginalAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        return oldExchange;
    }
}

有趣的是,我找到了一个名为 的聚合策略UseOriginalAggregationStrategy(),但我看不到如何指定Exchange original从 DSL 命名的参数。

    .enrich("direct:storeInHazelcast",
        new UseOriginalAggregationStrategy(???, false))

在 dsl 中没有某种getExchange()方法的情况下,我在这里看不到如何使用这种聚合策略(但如果有人可以建议如何使用,请执行)。

于 2014-01-07T12:03:19.450 回答
5

您无需编写自己的聚合策略即可使用

.enrich("direct:storeInHazelcast", AggregationStrategies.useOriginal())
于 2016-07-26T11:04:02.750 回答
4

将其保存在标题中并恢复它。

from("direct:foo")
    .to("direct:doSomething")         // 1 (POJO on the exchange body)
    .setHeader("old_body", body())    // save body
    .to("direct:storeInHazelcast")    // 2 (destroys my pojo! it gets -1)
    .setBody(header("old_body"))      // RESTORE the body
    .removeHeader("old_body")         // cleanup header
    .to("direct:doSomethingElse")     // 3 (Where is my POJO??)
;

这是破坏性组件的一个相当常见的范例。

于 2014-01-07T20:21:14.937 回答
1

我也有这个要求(在另一条路由上执行同步、仅处理),为了实现它,我编写了一个自定义处理器,它以编程方式发送 Exchange 的副本。我认为这会产生更好的 DSL,其中使用点的语义比使用丰富更清晰。

这个静态辅助方法创建处理器:

public static Processor synchronousWireTap(String uri) {
    return exchange -> {
        Exchange copy = exchange.copy();

        exchange.getContext().createProducerTemplate().send(uri,copy);

        //ProducerTemplate.send(String,Exchange) does not, unlike other send methods, rethrow an exception
        //on the exchange. We want any unhandled exception to be rethrown, so we must do so here.
        Throwable thrown = copy.getException(Throwable.class);

        if (thrown != null) {
            throw new CamelExecutionException(thrown.getMessage(), exchange, thrown);
        }
    };
}

这是一个使用示例:

from("direct:foo")
    .to("direct:doSomething")                               // 1 (POJO on the exchange body)
    .process(synchronousWireTap("direct:storeInHazelcast")) // 2 (Does not destroy POJO because a copy of the exchange gets sent to this uri)
    .to("direct:doSomethingElse")                           // 3 (POJO is still there)

请注意,此自定义处理器并非完全是标准wireTap() 的同步模拟,后者完全是in-only,因为此处理器重新抛出目标路由上发生的任何未处理的异常-但消息本身保持不变。这是我的要求,因为我想做的是在另一条路由上同步执行一些其他处理,并在失败时收到通知,否则我的主路由上的消息不会受到影响(相当于调用 void 方法在程序代码中)。

于 2018-07-20T10:54:19.563 回答
-1

您可以使用窃听中的 copy="true" 选项来复制http://camel.apache.org/wire-tap.html中提到的交换,或者您可以创建自己的处理器来做同样的事情。

于 2017-06-29T09:15:08.913 回答