我正在努力实现应由多个竞争消费者同时处理的请求/回复交换。
我有一个独立的Master模块,负责生成任务队列。而且我有许多Worker -modules 应该同时使用来自该队列的消息。
这是Camel 路由的Master部分:
from("direct:start")
.to("log:FROM.DIRECT?level=DEBUG")
.split(body()).setHeader(CamelHeader.TASKS_BATCH_ID, simple("BATCH-1"))
.setHeader(CamelHeader.TASK_TYPE, simple(TaskType.FETCH_INDEX))
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
EdgarFullIndexLocation location =
exchange.getIn().getBody(EdgarFullIndexLocation.class);
exchange.getIn().setBody(location.getId().toJson(), String.class);
}
})
.to("log:SPLIT?level=DEBUG")
.setExchangePattern(ExchangePattern.InOut)
.to("activemq:queue:tasksQueue?replyTo=completionsQueue" +
//"&transactedInOut=true" +
"&requestTimeout=" + Integer.MAX_VALUE +
"&disableTimeToLive=true")
.threads(10)
.to("log:RESPONSE?level=DEBUG")
.routeId(routeId);
这是 Camel 路由的Worker部分,我在其中使用队列:
from("activemq:queue:tasksQueue?asyncConsumer=true" +
"&concurrentConsumers=10")
.to("log:FROM.TASKS.QUEUE?level=DEBUG")
.choice()
.when(header(CamelHeader.TASK_TYPE).isEqualTo(TaskType.FETCH_INDEX))
.process(new FetchIndexTaskProcessor())
.otherwise()
.to("log:UNKNOWN.TASK?level=DEBUG");
这里FetchIndexTaskProcessor实现了AsyncProcessor:
public class FetchIndexTaskProcessor implements AsyncProcessor {
@Override public void process(Exchange exchange) throws Exception {}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
FetchIndexTask task = new FetchIndexTask(exchange, callback);
task.start();
return false;
}
}
这里FetchIndexTask扩展了Thread。在start()之后,新线程负责:
- 动态添加路由。
- 阻塞直到该路由的交换完成。
- 准备对原始交流的回复。
- 最后打电话
callback.done(false);
。
一切正常,除了有竞争消费者的部分- 它总是一次只有一个消费者。
我尝试了很多选择,例如:
.threads(10)
在不同的地方指定一个线程池。- 使用端点选项,例如
asyncConsumer
和concurrentConsumers
但似乎我错过了一些重要的东西,而且我似乎无法让它以并发的方式工作。正确的做法是什么?