0

我正在将所有消息排队到 rabbitmq 队列并在远程服务器上处理这些消息。下面是我在同一个班级的制作人和回复处理程序。

public class AmqpAsynchRpcItemWriter<T> implements ItemWriter<T>,
        MessageListener {

    protected String exchange;
    protected String routingKey;
    protected String queue;
    protected String replyQueue;
    protected RabbitTemplate template;

    // Reply handler 
    @Override
    public void onMessage(Message message) {

        try {
            String corrId = new String(message.getMessageProperties()
                    .getCorrelationId(), "UTF-8");
            System.out.println("received " + corrId + " from " + this.replyQueue);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    //Producer
    @Override
    public void write(List<? extends T> items) throws Exception {

        for (T item : items) {
            System.out.println(item);

            System.out.println("Queing " + item + " to " + this.queue);

            Message message = MessageBuilder
                    .withBody(item.toString().getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                    .setReplyTo(this.replyQueue)
                    .setCorrelationId(item.toString().getBytes()).build();

            template.send(this.exchange, this.routingKey, message);

            System.out.println("Queued " + item + " to " + this.queue);

        }

        // It should wait here untill we get all replies in onMessage, How can we do this ?

    }

我正在以 write 方法发送所有消息并在 onMessage 中获得回复。这工作正常,但 write 不等待回复,它返回给调用者,并且 spring-batch 步骤标记为已完成。

但我希望进程在发送所有消息后等待回复,直到我们在 onMessage 中获得所有回复。我们应该怎么做 ?

4

1 回答 1

0

您可以使用任意数量的同步技术;例如,让侦听器put在 a 中回复回复,LinkedBlockingQueue并让发送者take(或poll超时)从队列中取出,直到收到所有回复。

或者,根本不使用侦听器,只需从回复队列中使用相同的侦听RabbitTemplatereceive(),直到收到所有回复。

但是,如果队列为空,则receive()返回null,因此您必须在接收之间休眠以避免旋转 CPU。

于 2014-04-25T14:06:13.760 回答