1

我第一次使用 MQ,并尝试使用 RabbitMQ 实现日志系统。我的实施涉及“发件人”

/*
 * This class sends messages over MQ
 */
public class MQSender {
    private final static String EXCHANGE_NAME = "mm_exchange";
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        /*
         * Boilerplate stuff
         */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //declare the exchange that messages pass through, type=direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String[] levels = {"green", "orange", "red", "black"};
        for (String log_level : levels) {
            String message = "This is a " + log_level + " message";
            System.out.println("Sending " + log_level + " message");
            //publish the message with each of the bindings in levels
            channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes());
        }

        channel.close();
        connection.close();
    }
}

它将我的每种颜色的一条消息发送到交换,颜色将用作绑定。它涉及一个“接收者”

public class MQReceiver {
    private final static String EXCHANGE_NAME = "mm_exchange";
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        receiveMessagesFromQueue(2);
    }

    public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        /*
         * Boilerplate stuff
         */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //declare the exchange that messages pass through, type=direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        //generate random queue
        String queueName = channel.queueDeclare().getQueue();

        //set bindings from 0 to maxLevel for the queue
        for (int level = 0; level <= maxLevel; level++) {
            channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]);
        }

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while(true) {
            //waits until a message is delivered then gets that message
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}

它作为参数给出一个数字,表示我希望它从交换中提供哪些颜色绑定。

在我的实现中,通常在 RabbitMQ 中,似乎消息存储在交换器中,直到Consumer请求它们,此时它们被分发到各自的队列,然后一次发送一个到客户端(或 MQ 中的消费者行话)。我的问题是,当我在运行MQSender课程之前运行MQReceiver课程时,消息永远不会被传递。但是,当我MQReceiver首先运行课程时,会收到消息。根据我对 MQ 的理解,我认为消息应该存储在服务器上,直到MQReceiver类运行,然后消息应该被传递给他们的消费者,但这不是正在发生的事情。我的主要问题是这些消息是否可以存储在交换中,如果不能,它们应该存储在哪里,以便在调用消费者(即我的MQReceiver类)时将它们传递?

谢谢你的帮助!

4

1 回答 1

2

如果消息的路由键与绑定到交换的任何队列不匹配,RabbitMQ 会丢弃消息。当您首先开始时MQSender,没有绑定任何队列,因此它发送的消息会丢失。当您启动时MQReceiver,它将队列绑定到交换器,因此 RabbitMQ 有一个地方可以放置来自 的消息MQSender。当您停止 MQReceiver 时,由于您创建了一个匿名队列,该队列和所有绑定都将从交换中删除。

如果您希望消息在不运行时存储在服务器MQReceiver上,您需要接收方创建一个命名队列,并将路由键绑定到该队列。请注意,创建命名队列是幂等的,如果队列已经存在,则不会创建该队列。然后,您需要接收者从命名队列中提取消息。

将您的代码更改为如下所示:

MQSender

....
String namedQueue = "logqueue";
//declare named queue and bind log level routing keys to it.
//RabbitMQ will put messages with matching routing keys in this queue
channel.queueDeclare(namedQueue, false, false, false, null);
for (int level = 0; level < LOG_LEVELS.length; level++) {
   channel.queueBind(namedQueue, EXCHANGE_NAME, LOG_LEVELS[level]);
}
...

MQ接收器

...
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

QueueingConsumer consumer = new QueueingConsumer(channel);

//Consume messages off named queue instead of anonymous queue
String namedQueue = "logqueue";
channel.basicConsume(namedQueue, true, consumer);

while(true) {
...
于 2013-08-23T05:23:40.517 回答