12

如果我连接到 RabbitMQ 并使用 EventingBasicConsumer 侦听事件,我如何判断我是否已与服务器断开连接?

我知道有一个 Shutdown 事件,但是如果我拔下网络电缆来模拟故障,它不会触发。

我还尝试了 ModelShutdown 事件和模型上的 CallbackException,但似乎都没有。

编辑-----我标记为答案的那个是正确的,但这只是我解决方案的一部分。RabbitMQ 中还内置了 HeartBeat 功能。服务器在配置文件中指定它。它默认为 10 分钟,但您当然可以更改它。

客户端还可以通过在 ConnectionFactory 实例上设置 RequestedHeartbeat 值来请求不同的心跳间隔。

4

2 回答 2

9

我猜你正在使用 C# 库?(但即便如此,我认为其他人也有类似的事件)。

您可以执行以下操作:

public class MyRabbitConsumer
{
  private IConnection connection;

  public void Connect()
  {
    connection = CreateAndOpenConnection();
    connection.ConnectionShutdown += connection_ConnectionShutdown;
  }

  public IConnection CreateAndOpenConnection() { ... }

  private void connection_ConnectionShutdown(IConnection connection, ShutdownEventArgs reason)
  {

  }
}
于 2013-02-27T12:05:36.493 回答
2

这是一个例子,但标记的答案是导致我这样做的原因。

var factory = new ConnectionFactory
{
    HostName = "MY_HOST_NAME",
    UserName = "USERNAME",
    Password = "PASSWORD",
    RequestedHeartbeat = 30
};

using (var connection = factory.CreateConnection())
{
    connection.ConnectionShutdown += (o, e) =>
    {                       
        //handle disconnect                            
    };

    using (var model = connection.CreateModel())
    {
        model.ExchangeDeclare(EXCHANGE_NAME, "topic");
        var queueName = model.QueueDeclare();

        model.QueueBind(queueName, EXCHANGE_NAME, "#"); 

        var consumer = new QueueingBasicConsumer(model);
        model.BasicConsume(queueName, true, consumer);

        while (!stop)
        {
            BasicDeliverEventArgs args;                       
            consumer.Queue.Dequeue(5000, out args);

            if (stop) return;

            if (args == null) continue;
            if (args.Body.Length == 0) continue;

            Task.Factory.StartNew(() =>
            {
                //Do work here on different thread then this one
            }, TaskCreationOptions.PreferFairness);
        }
    }
}

关于这一点有几点需要注意。

我正在使用 # 作为主题。这抓住了一切。通常你想限制一个主题。

我正在设置一个名为“stop”的变量来确定进程何时结束。您会注意到循环永远运行,直到该变量为真。

如果没有新消息,出队等待 5 秒然后离开而不获取数据。这是为了确保我们监听那个停止变量并在某个时候真正退出。根据您的喜好更改值。

当有消息进来时,我在一个新线程上生成处理代码。当前线程被保留用于仅收听 rabbitmq 消息,如果处理程序处理时间过长,我不希望它减慢其他消息的速度。根据您的实施,您可能需要也可能不需要。但是要小心编写代码来处理消息。如果运行需要一分钟并且您在亚秒级时间内收到消息,您将耗尽内存或至少会出现严重的性能问题。

于 2015-01-18T17:39:30.463 回答