9

I want to process messages from a rabbitMq queue in parallel. The queue is configured to be autoAck =false. I am using the camel-rabbitMQ support for camel endpoints, which has support for a threadPoolSize parameter, but this does not have the desired effect. Messages are still processed serially off the queue, even when threadpoolsize=20.

From debugging through the code I can see that the threadpoolsize parameter is used to create an ExecutorService that is used to pass to the rabbit connectionfactory as described here. This all looks good until you get into the rabbit ConsumerWorkService. Here messages are processed in block of max size 16 messages. Each message in a block is processed serially and then if there is more work to do the executor service is invokes with the next block. A code snippet of this is below. From this use of the executor service I can't see how the messages can be processed in parallel. The executorservice only ever has one piece of work to perform at a time.

What am I Missing?

private final class WorkPoolRunnable implements Runnable {

        public void run() {
            int size = MAX_RUNNABLE_BLOCK_SIZE;
            List<Runnable> block = new ArrayList<Runnable>(size);
            try {
                Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
                if (key == null) return; // nothing ready to run
                try {
                    for (Runnable runnable : block) {
                        runnable.run();
                    }
                } finally {
                    if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
                        ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
                    }
                }
            } catch (RuntimeException e) {
                Thread.currentThread().interrupt();
            }
        }
4

2 回答 2

6

RabbitMQ 的文档对此不是很清楚,但是,即使ConsumerWorkService使用线程池,该池似乎也没有用于并行处理消息:

每个 Channel 都有自己的调度线程。对于每个通道一个消费者的最常见用例,这意味着消费者不会阻碍其他消费者。如果每个 Channel 有多个 Consumer,请注意长时间运行的 Consumer 可能会阻止向该 Channel 上的其他 Consumer 发送回调。

( http://www.rabbitmq.com/api-guide.html )

该文档建议Channel每个线程使用一个,实际上,如果您只是创建Channel与所需并发级别一样多的 s,则消息将在链接到这些通道的消费者之间分派。

我已经用 2 个通道和消费者进行了测试:当队列中有 2 条消息时,每个消费者一次只能选择一条消息。您提到的 16 条消息块似乎没有干扰,这是一件好事。

事实上,Spring AMQP 还创建了多个通道来同时处理消息。这是通过以下方式完成的:

我还测试了它是否按预期工作。

于 2014-09-30T10:00:48.517 回答
6

如果您有一个Channel实例,它将按照您通过检查正确发现的顺序依次调用其注册的消费者ConsumerWorkService。有两种方法可以克服:

  1. 使用多个渠道而不是一个渠道。
  2. 使用单一渠道,但以特殊方式实现消费者。他们应该只从队列中挑选传入的消息并将其作为任务放入内部线程池中。

您可以在这篇文章中找到更多详细信息。

于 2015-09-06T18:41:07.760 回答