我正在将所有消息排队到 rabbitmq 队列并在远程服务器上处理这些消息。下面是我在同一个班级的制作人和回复处理程序。
public class AmqpAsynchRpcItemWriter<T> implements ItemWriter<T>,
MessageListener {
protected String exchange;
protected String routingKey;
protected String queue;
protected String replyQueue;
protected RabbitTemplate template;
// Reply handler
@Override
public void onMessage(Message message) {
try {
String corrId = new String(message.getMessageProperties()
.getCorrelationId(), "UTF-8");
System.out.println("received " + corrId + " from " + this.replyQueue);
} catch (IOException e) {
e.printStackTrace();
}
}
//Producer
@Override
public void write(List<? extends T> items) throws Exception {
for (T item : items) {
System.out.println(item);
System.out.println("Queing " + item + " to " + this.queue);
Message message = MessageBuilder
.withBody(item.toString().getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setReplyTo(this.replyQueue)
.setCorrelationId(item.toString().getBytes()).build();
template.send(this.exchange, this.routingKey, message);
System.out.println("Queued " + item + " to " + this.queue);
}
// It should wait here untill we get all replies in onMessage, How can we do this ?
}
我正在以 write 方法发送所有消息并在 onMessage 中获得回复。这工作正常,但 write 不等待回复,它返回给调用者,并且 spring-batch 步骤标记为已完成。
但我希望进程在发送所有消息后等待回复,直到我们在 onMessage 中获得所有回复。我们应该怎么做 ?