4

我正在努力实现应由多个竞争消费者同时处理的请求/回复交换。

我有一个独立的Master模块,负责生成任务队列。而且我有许多Worker -modules 应该同时使用来自该队列的消息。

这是Camel 路由的Master部分:

from("direct:start")
.to("log:FROM.DIRECT?level=DEBUG")
.split(body()).setHeader(CamelHeader.TASKS_BATCH_ID, simple("BATCH-1"))
.setHeader(CamelHeader.TASK_TYPE, simple(TaskType.FETCH_INDEX))
.process(new Processor() {
    @Override
    public void process(Exchange exchange) throws Exception {
        EdgarFullIndexLocation location = 
            exchange.getIn().getBody(EdgarFullIndexLocation.class);
        exchange.getIn().setBody(location.getId().toJson(), String.class);
    }
})
.to("log:SPLIT?level=DEBUG")
.setExchangePattern(ExchangePattern.InOut)
.to("activemq:queue:tasksQueue?replyTo=completionsQueue" +
    //"&transactedInOut=true" + 
    "&requestTimeout=" + Integer.MAX_VALUE +
    "&disableTimeToLive=true")
.threads(10)
.to("log:RESPONSE?level=DEBUG")
.routeId(routeId);

这是 Camel 路由的Worker部分,我在其中使用队列:

from("activemq:queue:tasksQueue?asyncConsumer=true" + 
    "&concurrentConsumers=10")
.to("log:FROM.TASKS.QUEUE?level=DEBUG")
.choice()
    .when(header(CamelHeader.TASK_TYPE).isEqualTo(TaskType.FETCH_INDEX))
        .process(new FetchIndexTaskProcessor())
    .otherwise()
        .to("log:UNKNOWN.TASK?level=DEBUG");

这里FetchIndexTaskProcessor实现了AsyncProcessor

public class FetchIndexTaskProcessor implements AsyncProcessor {
    @Override public void process(Exchange exchange) throws Exception {}

    @Override
    public boolean process(Exchange exchange, AsyncCallback callback) {
        FetchIndexTask task = new FetchIndexTask(exchange, callback);
        task.start();
        return false;
    }

}

这里FetchIndexTask扩展了Thread。在start()之后,新线程负责:

  1. 动态添加路由。
  2. 阻塞直到该路由的交换完成。
  3. 准备对原始交流的回复。
  4. 最后打电话callback.done(false);

一切正常,除了有竞争消费者的部分- 它总是一次只有一个消费者。

我尝试了很多选择,例如:

  • .threads(10)在不同的地方指定一个线程池。
  • 使用端点选项,例如asyncConsumerconcurrentConsumers

但似乎我错过了一些重要的东西,而且我似乎无法让它以并发的方式工作。正确的做法是什么?

4

1 回答 1

2

如果您使用 Camel 2.9 或更高版本,那么我建议在您请求/回复的 activemq 端点上使用 replyToType=Exclusive。这告诉 Camel 队列是独占的,并且它加快了速度,因为不需要 JMS 消息选择器来拾取预期的相关消息。

请参阅Camel JMS 文档中的Request-reply over JMS部分:http: //camel.apache.org/jms

如果您使用临时队列,那也很快,因为不需要 JMS 消息选择器。

此外,您的路线从直接端点开始。这是一个同步调用,因此调用者将等待/阻塞,直到 Exchange 完全完成。

此外,Splitter EIP 可以配置为以并行模式运行,该模式将使用并发处理。如果你有一条大消息要拆分,那么考虑使用流式传输,它会按需拆分消息,而不是将整个消息内容加载到内存中。

无论如何,这条路线上发生了很多事情。你能更准确地指出你有问题的地方吗?它使帮助变得更容易。

于 2012-04-11T04:47:48.430 回答