我的 TCPClient 会话有问题。
的背景:
- 我运行批量 SMS 服务,我们最近从接收网络移动运营商 (MNO) 的新消息切换到收听新消息
- 我写了一个类来监听新消息(见下面的代码)
- 我给班级打了 15 次电话,因为我需要为 MNO 的每个帐户运行一个线程
- 每个线程通过特定帐户的订阅信息发送并等待消息
- 当我运行代码时,它调用并创建 15 个线程并指示所有线程都已连接
- 我每 60 分钟停止并启动(重新创建)15 个线程。每次刷新,我的日志都显示每个线程都连接正确
- 我需要 24/7 全天候的监听服务
问题:
- 尽管线程看起来像是连接的,但我有一些情况是我测试服务并且没有消息通过侦听服务。尽管 MNO 指示所有线程都已连接,但几乎就像线程已经进入睡眠状态
- 我可以做些什么来确保我总是收到入站消息?
这是我的监听服务代码。
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Windows.Forms;
using log4net;
using MessageManagerWinService.Util;
using MessageManagerWinService.DAL;
using MessageManagerWinService.Classes;
using System.Linq;
namespace MessageManagerWinService.MTNListener
{
public class Listener {
internal static readonly ILog log = LogManager.GetLogger(typeof(Listener));
private ManualResetEvent _stopEvent;
private Thread _listenerThread;
private bool _running;
private bool _connected;
private DateTime _lastSubscribe;
private TcpClient _client;
private NetworkStream _stream;
public string _listenType { get; set; }
public string _username { get; set; }
public string _password { get; set; }
public void Start()
{
_stopEvent = new ManualResetEvent(false);
//_listenType = listenType.ToUpper();
_listenerThread = new Thread(ListenerThreadProc);
_running = true;
_listenerThread.Start();
}
public void Stop() {
_running = false;
try
{
_client.Close();
log.Info(_listenType + " Client Closed - " + " - " + DateTime.Now);
}
catch (Exception)
{
log.Info("Tried Closing Client - Client not open - " + _listenType + " - " + DateTime.Now);
}
Thread.Sleep(1000); // or more
if (_listenerThread.IsAlive)
{
log.Info("Aborting thread - " + _listenType + " - " + DateTime.Now);
_listenerThread.Abort();
}
}
private void Reconnect()
{
int srvInd = 0;
while (_running && !_connected)
{
string addr = "";
if (_listenType == "USSD")
{
addr = AppSettings.Instance.ReceiveGatewayUSSD[srvInd];
}
else if (_listenType == "ACCOUNTS")
{
addr = AppSettings.Instance.ReceiveGatewayAccounts[srvInd];
}
log.Info("Connecting to " + _listenType +" IP Address : " + addr + "...");
var ipAddr = addr.Split(':')[0];
var port = Int32.Parse(addr.Split(':')[1]);
_client = new TcpClient();
IAsyncResult result = _client.BeginConnect(ipAddr, port, null, null);
WaitHandle timeoutHandler = result.AsyncWaitHandle;
try
{
if (!result.AsyncWaitHandle.WaitOne(TimeSpan.FromSeconds(AppSettings.Instance.ConnectTimeout), false))
{
_client.Close();
_connected = false;
log.Info("Timeout connecting to " + _listenType);
} else
{
//log.Info("Connected to " + _listenType);
_client.EndConnect(result);
_stream = _client.GetStream();
//_stream.ReadTimeout = AppSettings.Instance.ReadTimeout*1000;
_connected = true;
}
}
catch (Exception ex)
{
_connected = false;
log.Error("Error connecting to " + _listenType + ". Error: " + ex.Message, ex);
}
finally
{
timeoutHandler.Close();
}
if (!_connected)
{
srvInd++;
if (srvInd >= addr.Length)
{
srvInd = 0;
}
}
}
if (_running && _connected)
{
Subscribe();
}
}
protected void SendString(string str)
{
var buf = Encoding.ASCII.GetBytes(str + "\r\n");
_stream.Write(buf, 0, buf.Length);
}
private string FixPattern(string code)
{
return code.Replace("*", "\\*");
}
private void Subscribe() {
_lastSubscribe = DateTime.Now;
if (_listenType == "USSD")
{
//Subscribe to USSD service
SendString("<usereq USERNAME='" + _username + "' PASSWORD ='" + _password + "' VERBOSITY='0'>");
SendString("<subscribe NODE='.*' TRANSFORM='USSD' PATTERN='\\*'/>");
SendString("</usereq>END");
log.Info("Subscription to USSD - Connected - " + DateTime.Now.ToString());
}
else if (_listenType == "ACCOUNTS")
{
SendString("<usereq USERNAME='" + _username + "' PASSWORD ='" + _password + "'>");
//MTN_Accounts_SendString("<subscribe NODE='pollSmppTopic' TRANSFORM='DELIVER_SM'>");
SendString("<subscribe NODE='" + _username + "' TRANSFORM='DELIVER_SM'>");
SendString("<pattern><![CDATA[$short_message~='.*']]></pattern>");
SendString("</subscribe>");
SendString("</usereq>END");
log.Info("Subscription to Accounts (" + _username + ") - Connected - " + DateTime.Now.ToString());
}
}
private bool Eol(byte[] buf) {
if (buf.Length < 1) {
return false;
}
return buf[buf.Length - 1] == 0x0a;
}
protected string ReadString()
{
var ms = new MemoryStream();
while (!Eol(ms.ToArray()))
{
try
{
byte[] buf = new byte[1];
int len = _stream.Read(buf, 0, 1);
if (len > 0)
{
ms.WriteByte(buf[0]);
}
else
{
break;
}
}
catch(IOException)
{
break;
}
}
if (ms.Length==0)
{
return null;
}
return Encoding.ASCII.GetString(ms.ToArray()).TrimEnd(new char[] { '\r', '\n' });
}
private void ListenerThreadProc()
{
_connected = false;
while (_running)
{
try
{
if (!_connected)
{
//log.Info("_listenType + ("+ _username + ") - Disconnected. Attempting to reconnect. DateTime: " + DateTime.Now);
Reconnect();
}
//log.Info(_listenType +" ("+ _username + ") - Total minutes elapsed since last refresh : " +DateTime.Now.Subtract(_lastSubscribe).TotalMinutes);
if (DateTime.Now.Subtract(_lastSubscribe).TotalMinutes >= AppSettings.Instance.SubscribeInterval)
{
//log.Info(_listenType + " (" + _username + ") - Refreshing Thread : " + DateTime.Now.Subtract(_lastSubscribe).TotalMinutes);
EmailService sendMail = new EmailService();
sendMail.SendEmail("ADMIN", "ALERT - Message Manager : Re-subscribing to " + _listenType + " listening service", "", null, "");
Subscribe();
}
string line;
string message = "";
while ((line = ReadString()) != null)
{
using (var fs = new StreamWriter(Path.Combine(Path.GetDirectoryName(Application.ExecutablePath), AppSettings.Instance.MessageFilename), true))
{
fs.WriteLine(line);
}
log.Debug("Received: " + line);
if (line.Contains("<usarsp"))
{
message = "";
message = @"<?xml version=""1.0"" encoding=""UTF-8""?>";
message = message + line;
}
else if (line.Contains("</usarsp>"))
{
message = message + line;
ProcessMTNMessage processMessage = new ProcessMTNMessage();
processMessage.processMessage(_listenType, message);
}
else
{
message = message + line;
}
}
if(!_client.Connected)
{
_connected = false;
try
{
_stream.Close();
}
catch (Exception ex)
{
log.Error("Exception Error: " + ex.Message, ex);
}
}
Thread.Sleep(100);
}
catch(IOException)
{
log.Info("Disconnected.");
_connected = false;
try
{
_stream.Close();
}
catch (Exception ex)
{
log.Error("Exception Error: " + ex.Message, ex);
}
try
{
_client.Close();
}
catch (Exception ex)
{
log.Error("Exception Error: " + ex.Message, ex);
}
}
catch(ThreadInterruptedException ex)
{
log.Error("Thread Interrupted Error: " + ex.Message, ex);
}
catch(Exception ex)
{
log.Error("Exception Error: " + ex.Message, ex);
}
}
if(_connected)
{
try
{
_stream.Close();
}
catch(Exception ex)
{
log.Error("Exception Error: " + ex.Message, ex);
}
try
{
_client.Close();
}
catch (Exception ex)
{
log.Error("Exception Error: " + ex.Message, ex);
}
}
}
}
}