我有一个非常简单的客户端,我希望它可以 24/7 全天候使用消息。它在 Windows 进程中运行。
我对服务器和接收消息没有任何问题,它只是客户端。
行为如下:
如果我重新开始连接,则可以使用。过了一段时间,也许是几个小时后,我的客户处于一种奇怪的状态;它包含“持有”未确认消息的连接。
换句话说,使用 web 管理界面,我看到我总共有 2 条未确认的消息。查看我的连接,我看到 2 条未确认的消息散开。
但是没有进行任何处理。
最终,我的连接被终止,没有任何异常或日志消息被触发。这会将所有消息置于就绪状态。
我第一次尝试解决这个问题是添加一个简单的外部循环来检查 IModel、IChannel 和 QueueingBasicConsumer 的 i-var 的状态。然而,IModel/IChannel 的 IsOpen 总是报告 true,即使在 web 管理员报告没有连接处于活动状态之后,并且 QueueingBasicConsumer 的 IsRunning 也总是报告 true。
显然,我需要另一种方法来检查连接是否“活动”。
总而言之,事情最初运作良好。最终,我进入了一个奇怪的状态,我的诊断检查毫无意义,发送到服务器的消息未得到确认,并散布在任何现有的连接中。很快,我的连接被终止,没有任何调试或异常抛出,我的诊断检查仍然报告事情是洁净的。
任何帮助或最佳实践将不胜感激。我已经阅读了心跳和 IsOpen 'race' 条件,建议使用 BasicQos 并检查异常,但是我想先了解发生了什么。
这是我开始的地方:
private void StartMessageLoop(string uri, string queueName) {
this.serverUri = uri;
this.queueName = queueName;
Connect(uri);
Task.Factory.StartNew(()=> MessageLoopTask(queueName));
}
这是我的连接方式:
private void Connect(string serverAddress) {
ConnectionFactory cf = new ConnectionFactory();
cf.Uri = serverAddress;
this.connection = cf.CreateConnection();
this.connection.ConnectionShutdown += new ConnectionShutdownEventHandler(LogConnClose);
this.channel = this.connection.CreateModel();
}
这是无限循环开始的地方:
private void MessageLoopTask(string queueName) {
consumer = new QueueingBasicConsumer(channel);
String consumerTag = channel.BasicConsume(queueName, false, consumer);
while (true) {
try {
BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
IBasicProperties props = e.BasicProperties;
byte[] body = e.Body;
string messageContent = Encoding.UTF8.GetString(body);
bool result = this.messageProcessor.ProcessMessage(messageContent);
if(result){
channel.BasicAck(e.DeliveryTag, false);
}
else{
channel.BasicNack(e.DeliveryTag, false, true);
// log
}
}
catch (OperationInterruptedException ex) {
// log
break;
}
catch(Exception e) {
// log
break;
}
}
// log
}
问候, 戴恩