2

我正在尝试使用 Java 中的 RabbitMQ 运行一个非常基本的应用程序。我很想使用 Java 同时使用消息ExecutorService。我的项目使用的是 Spring,所以我定义了我的ThreadPoolExecutorFactoryBean类似:

<bean class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean"
        destroy-method="destroy">
    <property name="corePoolSize" value="8"/>   
    <property name="keepAliveSeconds" value="600"/>
    <property name="maxPoolSize" value="16"/>
    <property name="threadGroupName" value="CallbackQueue-Group"/>
    <property name="threadNamePrefix" value="CallbackQueue-Worker-"/>
</bean>

我正在使用正在执行以下操作的类将这个 bean 注入到我的主消息队列中:

this.connection = getConnectionFactory().newConnection(getQueueExecutor());
this.channel = this.connection.createChannel();
this.channel.queueDeclare(getQueueName(), true, false, false, null);
this.channel.basicConsume(getQueueName(), false, new DefaultConsumer(this.channel) {
    @Override public void handleDelivery(String consumerTag, Envelope envelope, 
            BasicProperties properties, byte[] body) throws IOException {
        logger.debug("Received message {}", properties.getCorrelationId());

        try { Thread.sleep(3000); } catch (InterruptedException e) {};

        getChannel().basicAck(envelope.getDeliveryTag(), false);
    }
});

简而言之,当我将多条消息发布到队列时,我应该看到日志语句非常接近地发生,即使任务需要一段时间才能执行。但是,我看到我的消费者一次只处理一项任务,尽管ExecutorService!更奇怪的是,我实际上在池服务队列中看到了不同的线程,尽管从来没有同时出现:

12:43:40.650 [CallbackQueue-Worker-2] DEBUG MyApplication - Received message 65bfbba29b4965eb0674c082c73dad7c
12:43:43.737 [CallbackQueue-Worker-3] DEBUG MyApplication - Received message 2a0b29012b13857c5a0ae8060f66dbaa
12:43:46.755 [CallbackQueue-Worker-3] DEBUG MyApplication - Received message 3c0742f9a284ac9c6b602200254c70db
12:43:49.769 [CallbackQueue-Worker-3] DEBUG MyApplication - Received message a462236fab19d51ba4bfea1582410a64
12:43:52.783 [CallbackQueue-Worker-3] DEBUG MyApplication - Received message 1a4713e1066dfc9e4ec1302098450a1f

我在这里做错了什么?ThreadPoolExecutorFactoryBean我在我的或我的 RabbitMQ 代码中是否遗漏了一些额外的配置?

4

1 回答 1

1

根据 com.rabbitmq.client.Channel 的描述:

虽然一个 Channel 可以被多个线程使用,但确保只有一个线程同时执行一个命令很重要。命令的并发执行可能会导致抛出 UnexpectedFrameError。

这可能是一个原因吗?您的日志显示使用了不同的工人(我们看到 2 和 3),但一次只使用一个工人。

于 2012-07-26T20:22:33.917 回答