2

我正在使用 Camel JMS 组件进行请求-回复以与 MQ 进行通信。对于我的一些请求,我可以收到n条回复消息。如何汇总这些回复消息?

我想过使用聚合器模式和聚合策略,但不能使用它,因为我不确定可以回复的消息数量。

社区可以帮助我了解正确的方法吗?我做了一些谷歌搜索,但找不到有用的东西。以下是我的示例路线代码

from("direct:"+routeName).routeId(routeName)
                        .setHeader("JMSCorrelationID", constant(UUID.randomUUID().toString()))
                        .circuitBreaker()
                            .resilience4jConfiguration()
                            .minimumNumberOfCalls(3)
                        .end()
                        .to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&replyTo=CAMELDEMOREPLY")
                            .log("${body}")
                            .unmarshal(customerDetailsOutBound)
                            .process(new Processor() {
                                    @Override
                                    public void process(Exchange exchange) throws Exception {
                                        System.out.println(exchange.getIn().getBody().toString());
                                    }
                            })
                        .onFallback().process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws Exception {
                                System.out.println("Store this message to backup");
                            }
                        })
                        .end();

期待从社区中获得一些好的见解。谢谢你。

4

3 回答 3

1

消息流

  1. 您的第一条路线将消息发送到CAMELDEMO队列并开始等待新队列上的单个聚合消息CAMELDEMO_AGGREGATED_REPLY
  2. 收到消息的组件CAMELDEMO,开始向 CAMELDEMOREPLY 队列发送响应,并指示将发送多少响应
  3. 下面的第二条路由开始监听CAMELDEMOREPLY,聚合消息并将聚合消息发送到 CAMELDEMO_AGGREGATED_REPLY
  4. 您等待回复的第一条路由CAMELDEMO_AGGREGATED_REPLY获取聚合回复,接收单个消息并将其发回

原始路线已更新以等待回复CAMELDEMO_AGGREGATED_REPLY

...
.to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&
                replyTo=CAMELDEMO_AGGREGATED_REPLY")
.log("${body}")
.unmarshal(customerDetailsOutBound)
.process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            System.out.println(exchange.getIn().getBody().toString());
        }
})
....

聚合消息的第二条路线

from(mqComponentBeanName+"://CAMELDEMOREPLY?
                          exchangePattern=In&requestTimeout=10000)
.aggregate(header("JMSCorrelationID"), new MyAggregationStrategy())
.to(mqComponentBeanName+"://CAMELDEMO_AGGREGATED_REPLY?
                          exchangePattern=Out&requestTimeout=10000)
public final class MyCompletionStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExch, Exchange newExchange) 
    {
        ...
        //Here you check your flag regarding the number of responses
        // you were supposed to receive, and if it is met
        // complete the aggregation by setting it to true
        oldExch.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
                ...
         return oldExchange;
     }
}
于 2020-06-22T07:06:04.510 回答
1

我能够用单条路线解决这个问题。解决方案可能不是那么整洁,但有效并实现了目的。我使用了 loopDoWhile 并在 loopDoWhile 内部的处理器中使用纯 java 代码从队列中获取消息。

from("direct:"+routeName).routeId(routeName)
                    .setHeader("JMSCorrelationID", constant(UUID.randomUUID().toString()))
                    .circuitBreaker()
                        .resilience4jConfiguration()
                        .minimumNumberOfCalls(3)
                    .end()
                    .to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&replyTo=CAMELDEMOREPLY")
                        .log("${body}")
                        .unmarshal(customerDetailsOutBound)
                        .process(new Processor() {
                                @Override
                                public void process(Exchange exchange) throws Exception {
                                    System.out.println(exchange.getIn().getBody().toString());


int msgCount = getMsgCountfromFirstReposnse;
if (msgCount > 1) {
exchange.getIn().setHeader("COUNTER", 0);
exchange.getIn().setHeader("MSG_COUNT", msgCount-1);
exchange.setProperty("connectionFactory", connectionFactory);
}
                                }
                        })
                    .loopDoWhile(simple("${headers.COUNTER} != ${headers.MSG_COUNT}"))
                            .process(simpleJMSConsumerProcess)
                        .end().endCircuitBreaker()
                    .onFallback().process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            System.out.println("Store this message to backup");
                        }
                    })

处理器内部代码:

ConnectionFactory connectionFactory = (ConnectionFactory) exchange.getProperty("connectionFactory");
    Connection connection = connectionFactory.createConnection();
    Session session = connection.createSession(false,
            Session.AUTO_ACKNOWLEDGE);

    try {
        Queue queue = session.createQueue("CAMELDEMOREPLY?consumer.priority=10");
        MessageConsumer consumer = session.createConsumer(queue, "JMSCorrelationID = '"+exchange.getIn().getHeader("JMSCorrelationID").toString()+"'");
        connection.start();
        TextMessage textMsg = (TextMessage) consumer.receive();
        System.out.println(textMsg);
        System.out.println("Received: " + textMsg.getText());
        exchange.getIn().setHeader("COUNTER", ((Integer)exchange.getIn().getHeader("COUNTER"))+1);
        if (connection != null) {
            connection.close();
        }
    } finally {
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
于 2020-06-22T10:59:20.183 回答
0

好吧,传统的请求-回复设计只有 1 条回复消息。等待响应的线程在第一个回复到达后立即停止侦听。

使用 JMS 相关 ID(每个请求没有专用线程),理论上可以接收同一请求的多个回复,但我不知道这是否真的有效/在 JMS 中是否允许。

根据评论更新

您在评论中写道,您可以针对一个请求收到多个 JMS 回复,甚至可以获得预期的回复数量。

如果这一切正常,您可以在您的 Camel 路由中使用聚合器 EIP来收集所有响应,然后再向调用者发送回复。

聚合器是高度可配置的。您可以决定如何组合响应,还可以定义多个完成条件(超时、消息数等)。

于 2020-06-22T06:12:10.377 回答