13

我正在使用 Java 客户端在 RHEL 5.3 上使用 RabbitMQ。我有 2 个节点(机器)。Node1 正在使用 Java 帮助程序类 QueueingConsumer 从 Node2 上的队列中消费消息。

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("MyQueueOnNode2", noAck, consumer);
while (true)
{
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   ... Process message - delivery.getBody()
}

如果接口在 Node1 或 Node2 上关闭(例如 ifconfig eth1 down),客户端(上图)永远不会知道网络不再存在。RabbitMQ 是否在 Java 客户端上提供某种类型的配置,可用于确定连接是否已消失。关闭 Node2 上的 RabbitMQ 服务器将触发 ShutdownSignalException,可以捕获该异常,并且应用程序可以进入重新连接循环。但是关闭接口不会导致任何类型的异常发生,因此代码将在 consumer.nextDelivery() 上永远等待。

我也尝试过使用这个调用的超时版本。例如

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("MyQueueOnNode2", noAck, consumer);
int timeout_ms = 30000;
while (true)
{
   QueueingConsumer.Delivery delivery = consumer.nextDelivery(timeout_ms);
   if (delivery == null)
   {
      if (channel.isOpen() == false)             // Seems to always return true
      { throw new ShutdownSignalException(); }
   }
   else
   {
     ... Process message - delivery.getBody()
   }
}

但似乎这总是返回 true (即使接口已关闭)。我假设在连接上注册 ShutdownListener 会产生相同的结果,但还没有尝试过。

有没有办法配置某种心跳,或者你只需​​要编写自定义租约逻辑(例如“我现在在这里”)才能让它工作?

4

2 回答 2

4

一般来说,最好在 rabbitmq-discuss 邮件列表上发布有关 rabbitmq 的问题。我们不倾向于跟踪在此之外提出的问题。

您可以配置一个心跳,但默认情况下它是关闭的。您也可以打开 TCP Keep Alive。setRequestedHeartbeatConnectionFactory创建新连接之前调用,或者,子类ConnectionFactory,覆盖configureSocket方法,然后调用socket.setKeepAlive(true)。当网络中断时,两者都应该导致连接注意到。

于 2010-03-19T10:50:00.287 回答
3

关于 isOpen 方法,在文档中有很好的描述:http ://www.rabbitmq.com/api-guide.html#shutdown-atomicity

关于关闭:关闭node1或2是指应用程序,而不是RabbitMQ服务器本身?如果另一个应用程序与消息代理断开连接,您为什么想知道任何应用程序?这不是消息传递的重点。

您唯一能做的就是发送带有“强制”参数的消息。这告诉 RabbitMQ 服务器,您希望至少有 1 个侦听器来接收您发送的消息(无论是直接队列还是主题/扇出交换中的某个队列)。如果消息无法传递到任何队列,则消息将返回您的频道并转发给给定的 ReturnListener。

于 2010-04-03T21:57:50.233 回答