2

我正在尝试使用 spring amqp 使用rabbitmq,下面是我的配置。

<rabbit:connection-factory id="rabbitConnectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" />

<rabbit:admin connection-factory="rabbitConnectionFactory" />

<rabbit:queue name="${rabbitmq.import.queue}" />

<rabbit:template id="importAmqpTemplate"
    connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}"  routing-key="${rabbitmq.import.queue}"/>

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="5">
    <rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener"  method="onMessage" />
</rabbit:listener-container>

这是一个简单的消息监听器类,

public class ImportMessageListener {

    @Override
    public void onMessage(Message message) {
        System.out.println("consumer output: " + message);
        return message;
    }

}

这是生产者(即春季批次的 itemWriter),

public class ImportItemWriter<T> implements ItemWriter<T> {

    private AmqpTemplate template;

    public AmqpTemplate getTemplate() {
        return template;
    }

    public void setTemplate(AmqpTemplate template) {
        this.template = template;
    }

    public void write(List<? extends T> items) throws Exception {
        for (T item : items) {
            Object reply = template.convertSendAndReceive(item.toString());
            System.out.println("producer output: " + reply);
        }
    }

}

当我运行我的春季批处理作业时,每条消息都会被一一发送和处理,我在响应中得到了回复

consumer output: 1
producer output: 1

consumer output: 2
producer output: 2

consumer output: 3
producer output: 3

consumer output: 4
producer output: 4


consumer output: 5
producer output: 5

它应该发送 5 条消息并将它们排队,并且 5 个消费者线程(并发 = 5)应该同时处理它们,并且应该在它完成后立即响应

So below should be the outout

consumer output: 1
consumer output: 2
consumer output: 3
consumer output: 4
consumer output: 5

producer output: 1
producer output: 2
producer output: 3
producer output: 4
producer output: 5

我不希望生产者等待第一条消息的回复来排队第二条消息。

我尝试使用 convertAndSend 使其异步(不等待回复)但是如何在 itemWriter 中获得回复消息,就像我可以使用 convertSendAndReceive 一样?

如果我将模板配置更改为

<rabbit:template id="importAmqpTemplate"
        connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}"
        routing-key="${rabbitmq.import.queue}" reply-queue="${rabbitmq.import.reply.queue}">
        <rabbit:reply-listener/>
    </rabbit:template>

如果我使用 template.convertAndSend(item.toString()); 那我怎样才能得到回复消息?

我无法将自己的消息处理程序附加到此侦听器以获取回复消息,就像我们可以在消费者端附加的方式一样。对于回复,它采用默认的 RabbitmqTemplate 处理程序。

4

1 回答 1

4

首先让我解释一下发生了什么

您使用同步 sendAndReceive操作。在发送您的消息之前,它会被丰富,TemporaryReplyQueue并且发送者(生产者)线程被阻塞以等待来自该消息的回复replyQueue。这就是为什么您的所有消息都是按顺序处理的。

要继续,我们需要知道在单向ItemWriter生产者中使用该操作的原因。

也许RabbitTemplate#convertAndSend对你来说就足够了吗?

更新

也许您只需要RabbitTemplate.ReturnCallback与 一起convertAndSend确定您的消息是否已传递?

更新 2

另一种想法是实现您使用TaskExecutor并行发送和接收的要求。这样,您无需等待第一条消息的依赖即可发送第二条消息:

final List<Object> replies = new ArrayList<Object>();
ExecutorService executor = Executors.newCachedThreadPool();
for (T item : items) {
     executor.execute(new Runnable() {

         public void run() {
            Object reply = template.convertSendAndReceive(item.toString());
            System.out.println("producer output: " + reply);
            replies.add(reply);
         }
     });
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);

replies之后,您可以对 all for current做一些事情items

于 2014-04-21T11:33:31.410 回答