我正在尝试使用 spring amqp 使用rabbitmq,下面是我的配置。
<rabbit:connection-factory id="rabbitConnectionFactory"
port="${rabbitmq.port}" host="${rabbitmq.host}" />
<rabbit:admin connection-factory="rabbitConnectionFactory" />
<rabbit:queue name="${rabbitmq.import.queue}" />
<rabbit:template id="importAmqpTemplate"
connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}" routing-key="${rabbitmq.import.queue}"/>
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="5">
<rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener" method="onMessage" />
</rabbit:listener-container>
这是一个简单的消息监听器类,
public class ImportMessageListener {
@Override
public void onMessage(Message message) {
System.out.println("consumer output: " + message);
return message;
}
}
这是生产者(即春季批次的 itemWriter),
public class ImportItemWriter<T> implements ItemWriter<T> {
private AmqpTemplate template;
public AmqpTemplate getTemplate() {
return template;
}
public void setTemplate(AmqpTemplate template) {
this.template = template;
}
public void write(List<? extends T> items) throws Exception {
for (T item : items) {
Object reply = template.convertSendAndReceive(item.toString());
System.out.println("producer output: " + reply);
}
}
}
当我运行我的春季批处理作业时,每条消息都会被一一发送和处理,我在响应中得到了回复
consumer output: 1
producer output: 1
consumer output: 2
producer output: 2
consumer output: 3
producer output: 3
consumer output: 4
producer output: 4
consumer output: 5
producer output: 5
它应该发送 5 条消息并将它们排队,并且 5 个消费者线程(并发 = 5)应该同时处理它们,并且应该在它完成后立即响应
So below should be the outout
consumer output: 1
consumer output: 2
consumer output: 3
consumer output: 4
consumer output: 5
producer output: 1
producer output: 2
producer output: 3
producer output: 4
producer output: 5
我不希望生产者等待第一条消息的回复来排队第二条消息。
我尝试使用 convertAndSend 使其异步(不等待回复)但是如何在 itemWriter 中获得回复消息,就像我可以使用 convertSendAndReceive 一样?
如果我将模板配置更改为
<rabbit:template id="importAmqpTemplate"
connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}"
routing-key="${rabbitmq.import.queue}" reply-queue="${rabbitmq.import.reply.queue}">
<rabbit:reply-listener/>
</rabbit:template>
如果我使用 template.convertAndSend(item.toString()); 那我怎样才能得到回复消息?
我无法将自己的消息处理程序附加到此侦听器以获取回复消息,就像我们可以在消费者端附加的方式一样。对于回复,它采用默认的 RabbitmqTemplate 处理程序。