0

我的 TCPClient 会话有问题。

的背景:

  • 我运行批量 SMS 服务,我们最近从接收网络移动运营商 (MNO) 的新消息切换到收听新消息
  • 我写了一个类来监听新消息(见下面的代码)
  • 我给班级打了 15 次电话,因为我需要为 MNO 的每个帐户运行一个线程
  • 每个线程通过特定帐户的订阅信息发送并等待消息
  • 当我运行代码时,它调用并创建 15 个线程并指示所有线程都已连接
  • 我每 60 分钟停止并启动(重新创建)15 个线程。每次刷新,我的日志都显示每个线程都连接正确
  • 我需要 24/7 全天候的监听服务

问题:

  • 尽管线程看起来像是连接的,但我有一些情况是我测试服务并且没有消息通过侦听服务。尽管 MNO 指示所有线程都已连接,但几乎就像线程已经进入睡眠状态
  • 我可以做些什么来确保我总是收到入站消息?

这是我的监听服务代码。

using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Windows.Forms;
using log4net;
using MessageManagerWinService.Util;
using MessageManagerWinService.DAL;
using MessageManagerWinService.Classes;
using System.Linq;


namespace MessageManagerWinService.MTNListener
{
public class Listener {
    internal static readonly ILog log = LogManager.GetLogger(typeof(Listener));

    private ManualResetEvent _stopEvent;
    private Thread _listenerThread;
    private bool _running;
    private bool _connected;
    private DateTime _lastSubscribe;
    private TcpClient _client;
    private NetworkStream _stream;

    public string _listenType { get; set; }
    public string _username { get; set; }
    public string _password { get; set; }

    public void Start() 
    {
        _stopEvent = new ManualResetEvent(false);

        //_listenType = listenType.ToUpper();
        _listenerThread = new Thread(ListenerThreadProc);
        _running = true;
        _listenerThread.Start();
    }

    public void Stop() {
        _running = false;

        try
        {
            _client.Close();
            log.Info(_listenType + " Client Closed - " + " - " + DateTime.Now);
        }
        catch (Exception) 
        {
            log.Info("Tried Closing Client - Client not open - " + _listenType + " - " + DateTime.Now);
        }

        Thread.Sleep(1000); // or more
        if (_listenerThread.IsAlive)
        {
            log.Info("Aborting thread - " + _listenType + " - " + DateTime.Now);
            _listenerThread.Abort();
        }
    }

    private void Reconnect() 
    {
        int srvInd = 0;

        while (_running && !_connected) 
        {
            string addr = "";

            if (_listenType == "USSD")
            {
                addr = AppSettings.Instance.ReceiveGatewayUSSD[srvInd];
            }
            else if (_listenType == "ACCOUNTS")
            {
                addr = AppSettings.Instance.ReceiveGatewayAccounts[srvInd];
            }

            log.Info("Connecting to " + _listenType +" IP Address : " + addr + "...");

            var ipAddr = addr.Split(':')[0];
            var port = Int32.Parse(addr.Split(':')[1]);

            _client = new TcpClient();
            IAsyncResult result = _client.BeginConnect(ipAddr, port, null, null);

            WaitHandle timeoutHandler = result.AsyncWaitHandle;

            try 
            {
                if (!result.AsyncWaitHandle.WaitOne(TimeSpan.FromSeconds(AppSettings.Instance.ConnectTimeout), false))
                {
                    _client.Close();
                    _connected = false;
                    log.Info("Timeout connecting to " + _listenType);
                } else 
                {
                    //log.Info("Connected to " + _listenType);
                    _client.EndConnect(result);
                    _stream = _client.GetStream();
                    //_stream.ReadTimeout = AppSettings.Instance.ReadTimeout*1000;
                    _connected = true;
                }
            } 
            catch (Exception ex) 
            {
                _connected = false;
                log.Error("Error connecting to " + _listenType + ". Error: " + ex.Message, ex);
            } 
            finally 
            {
                timeoutHandler.Close();
            }

            if (!_connected) 
            {
                srvInd++;
                if (srvInd >= addr.Length)
                {
                    srvInd = 0;
                }
            }
        }

        if (_running && _connected) 
        {
            Subscribe();
        }
    }

    protected void SendString(string str) 
    {
        var buf = Encoding.ASCII.GetBytes(str + "\r\n");
        _stream.Write(buf, 0, buf.Length);
    }

    private string FixPattern(string code) 
    {
        return code.Replace("*", "\\*");
    }

    private void Subscribe() {
        _lastSubscribe = DateTime.Now;

        if (_listenType == "USSD")
        {
            //Subscribe to USSD service
            SendString("<usereq USERNAME='" + _username + "' PASSWORD ='" + _password + "' VERBOSITY='0'>");
            SendString("<subscribe NODE='.*' TRANSFORM='USSD' PATTERN='\\*'/>");
            SendString("</usereq>END");

            log.Info("Subscription to USSD - Connected - " + DateTime.Now.ToString());
        }
        else if (_listenType == "ACCOUNTS")
        {
            SendString("<usereq USERNAME='" + _username + "' PASSWORD ='" + _password + "'>");
            //MTN_Accounts_SendString("<subscribe NODE='pollSmppTopic' TRANSFORM='DELIVER_SM'>");
            SendString("<subscribe NODE='" + _username + "' TRANSFORM='DELIVER_SM'>");
            SendString("<pattern><![CDATA[$short_message~='.*']]></pattern>");
            SendString("</subscribe>");
            SendString("</usereq>END");

            log.Info("Subscription to Accounts (" + _username + ") - Connected - " + DateTime.Now.ToString());
        }
    }

    private bool Eol(byte[] buf) {
        if (buf.Length < 1) {
            return false;
        }
        return buf[buf.Length - 1] == 0x0a;
    }

    protected string ReadString() 
    {
        var ms = new MemoryStream();
        while (!Eol(ms.ToArray())) 
        {
            try 
            {
                byte[] buf = new byte[1];
                int len = _stream.Read(buf, 0, 1);
                if (len > 0) 
                {
                    ms.WriteByte(buf[0]);
                }
                else 
                {
                    break;
                }
            } 
            catch(IOException) 
            {
                break;
            }
        }
        if (ms.Length==0) 
        {
            return null;
        }
        return Encoding.ASCII.GetString(ms.ToArray()).TrimEnd(new char[] { '\r', '\n' });
    }

    private void ListenerThreadProc() 
    {
        _connected = false;
        while (_running) 
        {
            try 
            {
                if (!_connected) 
                {
                    //log.Info("_listenType + ("+ _username + ") - Disconnected. Attempting to reconnect. DateTime: " + DateTime.Now);
                    Reconnect();
                }

                //log.Info(_listenType +" ("+ _username + ") - Total minutes elapsed since last refresh : " +DateTime.Now.Subtract(_lastSubscribe).TotalMinutes);

                if (DateTime.Now.Subtract(_lastSubscribe).TotalMinutes >= AppSettings.Instance.SubscribeInterval) 
                {
                    //log.Info(_listenType + " (" + _username + ") - Refreshing Thread : " + DateTime.Now.Subtract(_lastSubscribe).TotalMinutes);
                    EmailService sendMail = new EmailService();
                    sendMail.SendEmail("ADMIN", "ALERT - Message Manager : Re-subscribing to " + _listenType + " listening service", "", null, "");

                    Subscribe();
                }

                string line;
                string message = "";

                while ((line = ReadString()) != null)
                {
                    using (var fs = new StreamWriter(Path.Combine(Path.GetDirectoryName(Application.ExecutablePath), AppSettings.Instance.MessageFilename), true)) 
                    {
                        fs.WriteLine(line);
                    }

                    log.Debug("Received: " + line);

                    if (line.Contains("<usarsp"))
                    {
                        message = "";
                        message = @"<?xml version=""1.0"" encoding=""UTF-8""?>";
                        message = message + line;
                    }
                    else if (line.Contains("</usarsp>"))
                    {
                        message = message + line;

                        ProcessMTNMessage processMessage = new ProcessMTNMessage();
                        processMessage.processMessage(_listenType, message);
                    }
                    else
                    {
                        message = message + line;
                    }
                }

                if(!_client.Connected) 
                {
                    _connected = false;
                    try 
                    {
                        _stream.Close();
                    } 
                    catch (Exception ex) 
                    {
                        log.Error("Exception Error: " + ex.Message, ex);
                    }
                }

                Thread.Sleep(100);
            } 
            catch(IOException) 
            {
                log.Info("Disconnected.");
                _connected = false;
                try 
                {
                    _stream.Close();
                } 
                catch (Exception ex) 
                {
                    log.Error("Exception Error: " + ex.Message, ex);
                }

                try 
                {
                    _client.Close();
                } 
                catch (Exception ex) 
                {
                    log.Error("Exception Error: " + ex.Message, ex);
                }
            } 
            catch(ThreadInterruptedException ex) 
            {
                log.Error("Thread Interrupted Error: " + ex.Message, ex);
            } 
            catch(Exception ex) 
            {
                log.Error("Exception Error: " + ex.Message, ex);
            }
        }

        if(_connected) 
        {
            try 
            {
                _stream.Close();
            } 
            catch(Exception ex) 
            {
                log.Error("Exception Error: " + ex.Message, ex);
            }
            try 
            {
                _client.Close();
            } 
            catch (Exception ex) 
            {
                log.Error("Exception Error: " + ex.Message, ex);
            }
        }
    }

}

}

4

1 回答 1

0

看起来您没有定期发送 keepalive 消息;我在我的博客上描述了这一点。

另外,我建议您使用同步(例如,Read)或异步(例如,BeginConnect)方法,但不能同时使用。我通常推荐异步方法;它们更难编码,但更有弹性。

您的信息框架(正如我在博客中所描述的)也不正确;您没有正确处理两个更新可以背靠背到达的情况。

坦率地说,现在几乎所有关于代码的内容都是错误的。我强烈建议您不要实现自定义 TCP/IP 协议,因为您真的不需要。只需使用 SignalR、WebAPI 或 WCF,它们会为您处理协议的困难部分。你的代码最终会变得更干净、更可靠。

于 2013-09-08T19:36:24.240 回答