我实现了一个消费者,如果底层连接关闭,它会在一段时间后自动重新连接到代理。我的情况如下:
- 成功启动 RabbitMQ 服务器。
- 成功启动消费者。
- 发布消息,消费者成功接收。
停止 RabbitMQ 服务器,消费者会显示异常:
com.rabbitmq.client.ShutdownSignalException:连接错误;原因:{#method(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0), null, ""}。
然后消费者将在重新连接之前休眠 60 秒。
- 再次启动 RabbitMQ 服务器。
- 发布消息成功,命令'list_queues'的结果为0
- 60 秒后,消费者再次连接到 RabbitMQ,但现在收到的消息在步骤#6 发布。
- 发布第三条消息,消费者成功接收。
在这种情况下,重新连接之前发布的所有消息都将丢失。我还进行了另一个实验。
- 启动 RabbitMQ,并成功发布消息(未启动消费者进程)。
- 停止 RabbitMQ,然后重新启动它。
- 启动消费者进程,成功接收到步骤#1发布的消息。
注意:消费者的QOS是1。我研究了RabbitMQ几天,据我了解,消费者应该在重新连接之前获得发布的消息。请帮助(我基于 windows rabbitMQ 进行了测试)。
以下是发布者:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.getHost());
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel = conn.createChannel();
// declare a 'topic' type of exchange
channel.exchangeDeclare(exchangeName, "topic");
// Content-type "application/octet-stream", deliveryMode 2
// (persistent), priority zero
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_BASIC, message);
connection.close();
消费者如下:
@Override
public void consume(final String exchangeName, final String queueName, final String routingKey,
final int qos) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.getHost());
while (true) {
Connection connection = null;
try {
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "topic");
// declare a durable, non-exclusive, non-autodelete queue.
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// distribute workload among all consumers, consumer will
// pre-fetch
// {qos}
// messages to local buffer.
channel.basicQos(qos);
logger.debug(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
// disable auto-ack. If enable auto-ack, RabbitMQ delivers a
// message to
// the customer it immediately removes it from memory.
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
try {
RabbitMessageConsumer.this.consumeMessage(delivery);
}
catch (Exception e) {
// the exception shouldn't affect the next message
logger.info("[IGNORE]" + e.getMessage());
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
catch (Exception e) {
logger.warn(e);
}
if (autoReconnect) {
this.releaseConn(connection);
logger.info("[*] Will try to reconnect to remote host(" + this.getHost() + ") in "
+ this.reconnectInterval / 1000 + " seconds.");
Thread.sleep(this.getReconnectInterval());
}
else
break;
}
}
private void releaseConn(Connection conn) {
try {
if (conn != null)
conn.close();
}
catch (Exception e) {
// simply ignore this exception
}
}
因为它是一个“主题”交换,所以在 PUBLISHER 没有声明队列。然而,在第 1 次测试的第 3 步中,已声明持久队列,并且消息也是持久的。我不明白为什么在重新连接之前消息会丢失。