0

您好我有以下情况:我有一个客户端,它可以向多个服务器询问答案。根据输入,只有一个服务器可以回答。只有服务器知道他们回答什么。我曾尝试requestReply在 Apache Camel 中使用该模式,但我有一个小问题。

public void configure() // server1
{
    from("activemq:topic:topicName").choice().when(header("cc").endsWith(5)).process(new Processor() {
        @Override
        public void process(Exchange _exchange) throws Exception {
            _exchange.getOut().setBody(".....................Returning.from server1......");
        }
    });
}

public void configure() //server2
{
    String replyChannel = "activemq:replyChannel";
    from("activemq:topic:topicName").choice().when(header("cc").endsWith(6)).process(new Processor() {
        @Override
        public void process(Exchange _exchange) throws Exception {
            _exchange.getOut().setBody(".....................Returning from server2.");
        }
    });
}
....
String event = "test 1";
Object result = amqProducer.getProducerTemplate().sendBodyAndHeader("activemq:topic:topicName", ExchangePattern.InOut, event,"cc" ,event.length());
System.out.println("Result "+result);    

所以我希望上面的示例返回“...从 server2 返回。”,因为长度为 6。这也是结果,但 Camel 发出警告“收到未知相关 ID 的回复”,这是由于代码server1 的,它隐式也返回一个答案。

有没有更优雅的方法来解决这个问题?

4

1 回答 1

1

我认为你可以通过使用Scatter Gather模式来取得很好的效果。

解决方案应如下所示:

from("activemq:topic:topicName")
       .multicast()
           .to("seda:server1", "seda:server2", "seda:server3");

from("seda:server1").to("my_component:server1").to("seda:aggregator");
from("seda:server2").to("my_component:server2").to("seda:aggregator");
from("seda:server3").to("my_component:server3").to("seda:aggregator");

 from("seda:aggregator")
       .aggregate(new SingleGroupExpression(), new ServerResponseAggregator())
           .completionSize(3)
           .completionTimeout(10000)
 .to("my_component:client")

这应该发生的是,您从topicName收到一条消息,该消息“多播”到不同的内部SEDA队列。在这些队列上接收到消息后,会将消息分派到相应的服务器。收到来自服务器的响应后,消息被发送到另一个内部 SEDA 队列,以进行聚合和进一步处理。

在这个简短的示例中,聚合策略在收到 3 条消息或达到 10 秒超时时启动。

您应该注意的一件事是,此示例暗示服务器将回复您的应用程序。如果不是这种情况,您可以通过设置特定于用于与服务器通信的组件的超时选项来处理此问题。例如,如果您使用 JMS,您可以这样做:

from("seda:server1").to("jms:queue:server1?requestTimeout=5000").to("seda:aggregator");

然后使用其中一种骆驼错误处理机制(DefaultErrorHandler、Exception Clause、Dead Letter Channel 等)来处理错误。

于 2013-08-07T19:32:12.333 回答