0

我有一条多播到 2 个地方的路由。如果调用地点1时发生异常,我无法保留聚合结果。在 onException 的处理器内部,我在聚合期间创建的 Map 不存在。我使用骆驼2.25。

onException(RuntimeException.class)
    .process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            Map<String, String> results = exchange.getProperty(SimpleAggregationStrategy.RESULTS, Map.class);
            System.out.println(results);
        }
    });

from(DIRECT_FIRST)
    .log("First route")
    .setBody(constant("FIRST TEXT"));

from(DIRECT_SECOND)
    .log("Second route")
    .setBody(constant("SECOND TEXT"))
    .throwException(new RuntimeException("Dummy Exception"));

from(DIRECT_ENTRY)
    .multicast().stopOnException().aggregationStrategy(new AggregationStrategy() {
        public static final String RESULTS = "RESULTS";

        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            System.out.println("INSIDE SimpleAggregationStrategy !!!!!!!!!!!!!!!!");
            Map<String, String> results;
            if (oldExchange != null) {
                results = oldExchange.getProperty(RESULTS, Map.class);
            } else {
                results = new HashMap<>();
            }
            results.put(newExchange.getIn().getBody(String.class), newExchange.getIn().getBody(String.class));
            return newExchange;
        }
    })
    .to(DIRECT_FIRST, DIRECT_SECOND);
4

2 回答 2

0

我假设聚合器中止处理stopOnException(),因此不会返回(不完整的)结果

您可以尝试将聚合策略放入上下文管理的 bean 中,并使 Map 成为可通过 getter 方法访问的实例变量。

如果出现异常,您可以尝试从 bean 中获取不完整的 Map。但我不知道它是否仍然保存数据,或者在处理中止时它是否被清空。

于 2020-02-25T12:19:00.187 回答
0

解决方案比我想象的要简单。我们只需要在多播步骤之前创建一个 exchangeProperty。然后即使在进行多播时出现异常的情况下,该交换属性也可以存储聚合结果

onException(RuntimeException.class)
    .process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            Map<String, String> results = exchange.getProperty("RESULTS", Map.class);
            System.out.println(results);
        }
    });

from(DIRECT_FIRST)
    .log("First route")
    .setBody(constant("FIRST TEXT"));

from(DIRECT_SECOND)
    .log("Second route")
    .setBody(constant("SECOND TEXT"))
    .throwException(new RuntimeException("Dummy Exception"));

from(DIRECT_ENTRY)
    .process(exch -> {
        exch.setProperty("RESULTS", new HashMap<String, String>())
    })
    .multicast().stopOnException().aggregationStrategy(new AggregationStrategy() {
        public static final String RESULTS = "RESULTS";

        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            Map<String, String> results;
            if (oldExchange != null) {
                results = oldExchange.getProperty(RESULTS, Map.class);
            } else {
                results = new HashMap<>();
            }
            results.put(newExchange.getIn().getBody(String.class), newExchange.getIn().getBody(String.class));
            return newExchange;
        }
    })
    .to(DIRECT_FIRST, DIRECT_SECOND);
于 2020-05-07T22:38:26.890 回答