3

我有一个非常简单的客户端,我希望它可以 24/7 全天候使用消息。它在 Windows 进程中运行。

我对服务器和接收消息没有任何问题,它只是客户端。

行为如下:

如果我重新开始连接,则可以使用。过了一段时间,也许是几个小时后,我的客户处于一种奇怪的状态;它包含“持有”未确认消息的连接。

换句话说,使用 web 管理界面,我看到我总共有 2 条未确认的消息。查看我的连接,我看到 2 条未确认的消息散开。

但是没有进行任何处理。

最终,我的连接被终止,没有任何异常或日志消息被触发。这会将所有消息置于就绪状态。

我第一次尝试解决这个问题是添加一个简单的外部循环来检查 IModel、IChannel 和 QueueingBasicConsumer 的 i-var 的状态。然而,IModel/IChannel 的 IsOpen 总是报告 true,即使在 web 管理员报告没有连接处于活动状态之后,并且 QueueingBasicConsumer 的 IsRunning 也总是报告 true。

显然,我需要另一种方法来检查连接是否“活动”。

总而言之,事情最初运作良好。最终,我进入了一个奇怪的状态,我的诊断检查毫无意义,发送到服务器的消息未得到确认,并散布在任何现有的连接中。很快,我的连接被终止,没有任何调试或异常抛出,我的诊断检查仍然报告事情是洁净的。

任何帮助或最佳实践将不胜感激。我已经阅读了心跳和 IsOpen 'race' 条件,建议使用 BasicQos 并检查异常,但是我想先了解发生了什么。

这是我开始的地方:

private void StartMessageLoop(string uri, string queueName) {
    this.serverUri = uri;
    this.queueName = queueName;

    Connect(uri);
    Task.Factory.StartNew(()=> MessageLoopTask(queueName));
}

这是我的连接方式:

private void Connect(string serverAddress) {
    ConnectionFactory cf = new ConnectionFactory();

    cf.Uri = serverAddress;

    this.connection = cf.CreateConnection();

    this.connection.ConnectionShutdown += new ConnectionShutdownEventHandler(LogConnClose);
    this.channel = this.connection.CreateModel();
}

这是无限循环开始的地方:

private void MessageLoopTask(string queueName) {
    consumer = new QueueingBasicConsumer(channel);
    String consumerTag = channel.BasicConsume(queueName, false, consumer);

    while (true) {
        try {       
            BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
            IBasicProperties props = e.BasicProperties;
            byte[] body = e.Body;       

            string messageContent = Encoding.UTF8.GetString(body);
            bool result = this.messageProcessor.ProcessMessage(messageContent);

            if(result){
                channel.BasicAck(e.DeliveryTag, false);
            }
            else{
                channel.BasicNack(e.DeliveryTag, false, true);
                // log
            }
        }
        catch (OperationInterruptedException ex) {
            // log
            break;
        }    
        catch(Exception e) {            
            // log      
            break;
        }
    }
    // log
}

问候, 戴恩

4

0 回答 0