我有一个将消息发布到 RabbitMQ 服务器的 java 应用程序。
当可用磁盘空间低于 rabbit 的低水位线时,我会出现意外行为。
预期的行为是连接将成为阻塞,使我的应用程序挂起调用Channel.basicPublish
.
实际行为是连接似乎在管理控制台中被阻塞,但调用Channel.basicPublish
返回时没有错误,并且应该发布的消息丢失了。
这种行为破坏了 RabbitMQ 最重要的特性,即健壮性。
下面是我的测试应用程序的最小版本。它所做的只是每秒发布一条带有递增索引 (1, 2, 3, ...) 的消息。RabbitMQ 服务器可以很好地接收消息,直到我将低水位标记设置为非常高的值,方法是将以下行放入rabbitmq.config
文件中:
[
{rabbit, [{disk_free_limit, 60000000000}]}
].
重新启动服务器后,我在管理控制台中收到磁盘空间不足通知,连接被标记为“阻塞”,服务器不再收到任何消息。但是,应用程序继续运行并发送消息,就好像没有任何问题一样。当我将水印降低到正常值时,服务器会再次接收到消息,但是在连接阻塞时发送的所有消息都会丢失。
- 难道我做错了什么?
- 这是 RabbitMQ 中的错误吗?
- 如果是这样,是否有解决方法?
操作系统:Windows 8 64bit
RabbitMQ 服务器版本:3.1.1
RabbitMQ Java 客户端版本:3.1.0
测试应用代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class Main {
private final static Logger logger = LoggerFactory.getLogger(Main.class);
private final static String QUEUE_NAME = "testQueue";
private static Channel channel = null;
private static void connectToRabbitMQ() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(
QUEUE_NAME,
true, // Durable - survive a server restart
false, // Not exclusive to this connection
false, // Do not autodelete when no longer in use
null // Arguments
);
}
private static void disposeChannel()
{
if (channel == null) {
return;
}
try {
channel.close();
} catch (Exception e) {
} finally {
channel = null;
}
}
public static void main(String[] args) throws Exception {
boolean interrupted = false;
int messageNumber = 1;
while (!interrupted) {
byte[] message = Integer.toString(messageNumber).getBytes();
try {
if (channel == null) {
connectToRabbitMQ();
}
channel.basicPublish(
"",
QUEUE_NAME,
MessageProperties.MINIMAL_PERSISTENT_BASIC,
message
);
logger.info("Published message number {}", messageNumber);
messageNumber++;
} catch (Exception e) {
logger.info("Unable to connect to RabbitMQ...");
disposeChannel();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.info("Interrupted");
interrupted = true;
}
}
}
}