我正在努力实现应由多个竞争消费者同时处理的请求/回复交换。
我有一个独立的Master模块,负责生成任务队列。而且我有许多Worker -modules 应该同时使用来自该队列的消息。
这是Camel 路由的Master部分:
from("direct:start")
.to("log:FROM.DIRECT?level=DEBUG")
.split(body()).setHeader(CamelHeader.TASKS_BATCH_ID, simple("BATCH-1"))
.setHeader(CamelHeader.TASK_TYPE, simple(TaskType.FETCH_INDEX))
.process(new Processor() {
    @Override
    public void process(Exchange exchange) throws Exception {
        EdgarFullIndexLocation location = 
            exchange.getIn().getBody(EdgarFullIndexLocation.class);
        exchange.getIn().setBody(location.getId().toJson(), String.class);
    }
})
.to("log:SPLIT?level=DEBUG")
.setExchangePattern(ExchangePattern.InOut)
.to("activemq:queue:tasksQueue?replyTo=completionsQueue" +
    //"&transactedInOut=true" + 
    "&requestTimeout=" + Integer.MAX_VALUE +
    "&disableTimeToLive=true")
.threads(10)
.to("log:RESPONSE?level=DEBUG")
.routeId(routeId);
这是 Camel 路由的Worker部分,我在其中使用队列:
from("activemq:queue:tasksQueue?asyncConsumer=true" + 
    "&concurrentConsumers=10")
.to("log:FROM.TASKS.QUEUE?level=DEBUG")
.choice()
    .when(header(CamelHeader.TASK_TYPE).isEqualTo(TaskType.FETCH_INDEX))
        .process(new FetchIndexTaskProcessor())
    .otherwise()
        .to("log:UNKNOWN.TASK?level=DEBUG");
这里FetchIndexTaskProcessor实现了AsyncProcessor:
public class FetchIndexTaskProcessor implements AsyncProcessor {
    @Override public void process(Exchange exchange) throws Exception {}
    @Override
    public boolean process(Exchange exchange, AsyncCallback callback) {
        FetchIndexTask task = new FetchIndexTask(exchange, callback);
        task.start();
        return false;
    }
}
这里FetchIndexTask扩展了Thread。在start()之后,新线程负责:
- 动态添加路由。
- 阻塞直到该路由的交换完成。
- 准备对原始交流的回复。
- 最后打电话callback.done(false);。
一切正常,除了有竞争消费者的部分- 它总是一次只有一个消费者。
我尝试了很多选择,例如:
- .threads(10)在不同的地方指定一个线程池。
- 使用端点选项,例如asyncConsumer和concurrentConsumers
但似乎我错过了一些重要的东西,而且我似乎无法让它以并发的方式工作。正确的做法是什么?