4

我实现了一个消费者,如果底层连接关闭,它会在一段时间后自动重新连接到代理。我的情况如下:

  1. 成功启动 RabbitMQ 服务器。
  2. 成功启动消费者。
  3. 发布消息,消费者成功接收。
  4. 停止 RabbitMQ 服务器,消费者会显示异常:

    com.rabbitmq.client.ShutdownSignalException:连接错误;原因:{#method(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0), null, ""}。

    然后消费者将在重新连接之前休眠 60 秒。

  5. 再次启动 RabbitMQ 服务器。
  6. 发布消息成功,命令'list_queues'的结果为0
  7. 60 秒后,消费者再次连接到 RabbitMQ,但现在收到的消息在步骤#6 发布。
  8. 发布第三条消息,消费者成功接收。

在这种情况下,重新连接之前发布的所有消息都将丢失。我还进行了另一个实验。

  1. 启动 RabbitMQ,并成功发布消息(未启动消费者进程)。
  2. 停止 RabbitMQ,然后重新启动它。
  3. 启动消费者进程,成功接收到步骤#1发布的消息。

注意:消费者的QOS是1。我研究了RabbitMQ几天,据我了解,消费者应该在重新连接之前获得发布的消息。请帮助(我基于 windows rabbitMQ 进行了测试)。

以下是发布者:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.getHost());
connection = factory.newConnection();
Channel channel = connection.createChannel();               
channel = conn.createChannel();
// declare a 'topic' type of exchange
channel.exchangeDeclare(exchangeName, "topic");
// Content-type "application/octet-stream", deliveryMode 2
// (persistent), priority zero
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_BASIC, message);
connection.close();

消费者如下:

    @Override
public void consume(final String exchangeName, final String queueName, final String routingKey,
        final int qos) throws IOException, InterruptedException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(this.getHost());

    while (true) {
        Connection connection = null;
        try {
            connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(exchangeName, "topic");
            // declare a durable, non-exclusive, non-autodelete queue.
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            // distribute workload among all consumers, consumer will
            // pre-fetch
            // {qos}
            // messages to local buffer.
            channel.basicQos(qos);

            logger.debug(" [*] Waiting for messages. To exit press CTRL+C");

            QueueingConsumer consumer = new QueueingConsumer(channel);
            // disable auto-ack. If enable auto-ack, RabbitMQ delivers a
            // message to
            // the customer it immediately removes it from memory.
            boolean autoAck = false;
            channel.basicConsume(queueName, autoAck, consumer);

            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                try {
                    RabbitMessageConsumer.this.consumeMessage(delivery);
                }
                catch (Exception e) {
                    // the exception shouldn't affect the next message
                    logger.info("[IGNORE]" + e.getMessage());
                }
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
        catch (Exception e) {
            logger.warn(e);
        }

        if (autoReconnect) {
            this.releaseConn(connection);
            logger.info("[*] Will try to reconnect to remote host(" + this.getHost() + ") in "
                    + this.reconnectInterval / 1000 + " seconds.");
            Thread.sleep(this.getReconnectInterval());
        }
        else
            break;
    }
}

private void releaseConn(Connection conn) {
    try {
        if (conn != null)
            conn.close();
    }
    catch (Exception e) {
        // simply ignore this exception
    }
}

因为它是一个“主题”交换,所以在 PUBLISHER 没有声明队列。然而,在第 1 次测试的第 3 步中,已声明持久队列,并且消息也是持久的。我不明白为什么在重新连接之前消息会丢失。

4

4 回答 4

8

哦,我找到了原因......消息和队列当然是持久的,但是交换不是持久的。由于交换是不持久的,队列和交换之间的绑定信息将在 RabbitMQ 代理重启之间丢失。

现在我将交换声明为持久的,消费者可以获得在消费者重启之前和代理重启之后发布的消息。

于 2013-08-14T09:01:29.560 回答
6

如果发布时没有队列,则消息将丢失。您是否连接到同一个队列,如果是,它是持久的,还是在重新启动 RMQ 服务器后发布消息之前重新创建它?听起来解决方案是以下之一:

  1. 使队列持久并在重新启动后重新连接到该队列
  2. 确保在发布之前创建队列。

此外,请确保您重新连接到消费者中的正确队列。1)可能是两种解决方案中更好的一种。

于 2013-08-11T09:20:03.447 回答
3

代替 channel.queueDeclare(QUEUE_NAME, true, false, false, null);

用这个channel.queueDeclare(QUEUE_NAME, false, false, false, null);

Rabbitmq-java教程

于 2019-09-17T20:22:53.033 回答
0

从 3.1 升级到 3.3 后,这发生在我的 RabbitMQ 集群上。我的解决方案是删除/var/lib/rabbitmq/mnesia目录。

于 2014-06-12T08:16:16.467 回答