35

您是否有任何指示如何确定何时发生订阅问题以便我可以重新连接?

我的服务使用 RabbitMQ.Client.MessagePatterns.Subscription 进行订阅。一段时间后,我的客户默默地停止接收消息。我怀疑网络问题,因为我的 VPN 连接不是最可靠的。

我已经通读了一段时间的文档,寻找一个密钥来找出这个订阅何时可能由于网络问题而被破坏,但运气不佳。我试过检查连接和通道是否仍然打开,但它似乎总是报告它仍然打开。

它处理的消息工作得很好,并被确认回队列,所以我认为这不是“ack”的问题。

我确定我一定只是错过了一些简单的东西,但我还没有找到它。

public void Run(string brokerUri, Action<byte[]> handler)
{
    log.Debug("Connecting to broker: {0}".Fill(brokerUri));
    ConnectionFactory factory = new ConnectionFactory { Uri = brokerUri };

    using (IConnection connection = factory.CreateConnection())
    {
        using (IModel channel = connection.CreateModel())
        {
            channel.QueueDeclare(queueName, true, false, false, null);

            using (Subscription subscription = new Subscription(channel, queueName, false))
            {
                while (!Cancelled)
                {
                    BasicDeliverEventArgs args;

                    if (!channel.IsOpen)
                    {
                        log.Error("The channel is no longer open, but we are still trying to process messages.");
                        throw new InvalidOperationException("Channel is closed.");
                    }
                    else if (!connection.IsOpen)
                    {
                        log.Error("The connection is no longer open, but we are still trying to process message.");
                        throw new InvalidOperationException("Connection is closed.");
                    }

                    bool gotMessage = subscription.Next(250, out args);

                    if (gotMessage)
                    {
                        log.Debug("Received message");
                        try
                        {
                            handler(args.Body);
                        }
                        catch (Exception e)
                        {
                            log.Debug("Exception caught while processing message. Will be bubbled up.", e);
                            throw;
                        }

                        log.Debug("Acknowledging message completion");
                        subscription.Ack(args);
                    }
                }
            }
        }
    }
}

更新:

我通过在虚拟机中运行服务器来模拟网络故障,当我断开连接足够长的时间时,我确实得到了一个异常(RabbitMQ.Client.Exceptions.OperationInterruptedException:AMQP 操作被中断)所以它可能不是网络问题。现在我不知道它会是什么,但运行几个小时后它就失败了。

4

1 回答 1

64

编辑:由于我对此表示赞同,我应该指出.NET RabbitMQ 客户端现在内置了此功能:https ://www.rabbitmq.com/dotnet-api-guide.html#connection-recovery

理想情况下,您应该能够使用它并避免手动实现重新连接逻辑。


我最近不得不实施几乎相同的事情。据我所知,大多数关于 RabbitMQ 的可用信息都假设您的网络非常可靠,或者您在与任何发送或接收消息的客户端相同的机器上运行 RabbitMQ 代理,从而允许 Rabbit 处理任何连接问题。

将 Rabbit 客户端设置为对断开连接具有鲁棒性并不难,但是您需要处理一些特殊情况。

您需要做的第一件事是打开心跳:

ConnectionFactory factory = new ConnectionFactory() 
{
  Uri = brokerUri,
  RequestedHeartbeat = 30,
}; 

将“RequestedHeartbeat”设置为 30 将使客户端每 30 秒检查一次连接是否仍然存在。如果不启用此功能,消息订阅者将愉快地坐在那里等待另一条消息进来,而不会知道它的连接已经坏了。

打开心跳也会使服务器检查连接是否仍然正常,这可能非常重要。如果在订阅者接收到消息之后但在确认之前连接变坏,则服务器只是假设客户端花费了很长时间,并且消息“卡在”死连接上,直到它被关闭。打开心跳后,服务器将识别连接何时出现故障并关闭它,将消息放回队列中以便另一个订阅者可以处理它。如果没有心跳,我不得不手动进入并关闭 Rabbit 管理 UI 中的连接,以便将卡住的消息传递给订阅者。

其次,您将需要处理OperationInterruptedException. 正如您所注意到的,这通常是 Rabbit 客户端在发现连接中断时会抛出的异常。如果IModel.QueueDeclare()在连接中断时调用,这是您将得到的异常。通过处理您的订阅、频道和连接并创建新的来处理此异常。

最后,您必须处理消费者在尝试使用来自已关闭连接的消息时所做的事情。不幸的是,在 Rabbit 客户端中,从队列中消费消息的每种不同方式似乎都有不同的反应。 如果您调用关闭的连接, 则QueueingBasicConsumer抛出。什么都不做,因为它只是在等待消息。从我的尝试中可以看出,您正在使用的类似乎从调用返回 true ,但值为null。再一次,通过处理你的连接、频道和订阅并重新创建它们来处理这个问题。EndOfStreamExceptionQueueingBasicConsumer.Queue.DequeueEventingBasicConsumerSubscriptionSubscription.Nextargs

当连接失败且心跳打开时,的值connection.IsOpen将更新为 False,因此您可以根据需要进行检查。但是,由于心跳在单独的线程上运行,您仍然需要处理连接在检查时打开但在subscription.Next()调用之前关闭的情况。

最后要注意的一件事是IConnection.Dispose()EndOfStreamException如果您在连接关闭后调用 dispose,此调用将抛出一个。这对我来说似乎是一个错误,我不喜欢不在IDisposable对象上调用 dispose,所以我调用它并吞下异常。

将所有这些放在一个快速而肮脏的示例中:

public bool Cancelled { get; set; }

IConnection _connection = null;
IModel _channel = null;
Subscription _subscription = null;

public void Run(string brokerUri, string queueName, Action<byte[]> handler)
{
    ConnectionFactory factory = new ConnectionFactory() 
    {
        Uri = brokerUri,
        RequestedHeartbeat = 30,
    };

    while (!Cancelled)
    {               
        try
        {
            if(_subscription == null)
            {
                try
                {
                    _connection = factory.CreateConnection();
                }
                catch(BrokerUnreachableException)
                {
                    //You probably want to log the error and cancel after N tries, 
                    //otherwise start the loop over to try to connect again after a second or so.
                    continue;
                }

                _channel = _connection.CreateModel();
                _channel.QueueDeclare(queueName, true, false, false, null);
                _subscription = new Subscription(_channel, queueName, false);
            }

            BasicDeliverEventArgs args;
            bool gotMessage = _subscription.Next(250, out args);
            if (gotMessage)
            {
                if(args == null)
                {
                    //This means the connection is closed.
                    DisposeAllConnectionObjects();
                    continue;
                }

                handler(args.Body);
                _subscription.Ack(args);
            }
        }
        catch(OperationInterruptedException ex)
        {
            DisposeAllConnectionObjects();
        }
    }
    DisposeAllConnectionObjects();
}

private void DisposeAllConnectionObjects()
{
    if(_subscription != null)
    {
        //IDisposable is implemented explicitly for some reason.
        ((IDisposable)_subscription).Dispose();
        _subscription = null;
    }

    if(_channel != null)
    {
        _channel.Dispose();
        _channel = null;
    }

    if(_connection != null)
    {
        try
        {
            _connection.Dispose();
        }
        catch(EndOfStreamException) 
        {
        }
        _connection = null;
    }
}
于 2012-10-04T20:23:51.817 回答