1

我有一个 tcp 连接和两个线程分别处理发送和接收。有时接收线程在程序启动几分钟后无法接收任何数据,然后从 Receive() 方法中得到一个 Timeout 错误。但是 Wireshark 显示在线上有数据,并且由于程序停止接收,接收窗口大小减小到零。

似乎这种情况在我的笔记本电脑上发生得更频繁,这是一台由 Parallels 在 Mac OS X 上运行的 Windows 7。

class Test
{
    public static string Host { get; set; }
    public static int Port { get; set; }
    public static Action<Exception> ErrorHandler { get; set; }

    private Socket socket;
    private object sendSync;

    private Thread recvThread;
    private Thread sendThread;
    private Thread connectThread;

    private readonly int RECEIVETIMOUT = 20 * 1000;
    private readonly int MAXCONNECTATTEMPTINTERVAL = 27;

    private int connectingAttempt = 0;

    private ManualResetEvent sendReadyEvent;
    private ManualResetEvent receiveReadyEvent;
    private ManualResetEvent sendStoppedEvent;
    private ManualResetEvent receiveStoppedEvent;


    private Queue<byte[]> waitingRequests;

    private Test()
    {
        sendSync = new object();
        waitingRequests = new Queue<byte[]>();

        sendReadyEvent = new ManualResetEvent(false);
        receiveReadyEvent = new ManualResetEvent(false);

        sendStoppedEvent = new ManualResetEvent(true);
        receiveStoppedEvent = new ManualResetEvent(true);

        sendThread = new Thread(this.Send);
        sendThread.IsBackground = true;
        sendThread.Start();

        recvThread = new Thread(this.Receive);
        recvThread.IsBackground = true;
        recvThread.Start();

        connectThread = new Thread(this.Connect);
        connectThread.IsBackground = true;
        connectThread.Start();

        callbackThread = new Thread(this.callback);
        callbackThread.IsBackground = true;
        callbackThread.Start();
    }

    private static Test instance = null;
    public static Test GetInstance()
    {
        if(instance == null)
            instance = new Test();
        return instance;
    }

    private void Connect()
    {
        while (true)
        {
            try
            {
                sendStoppedEvent.WaitOne();
                receiveStoppedEvent.WaitOne();

                socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                socket.ReceiveTimeout = RECEIVETIMOUT;

                var ip = Dns.GetHostAddresses(Host)[0];
                var endPoint = new IPEndPoint(ip, Port);
                socket.Connect(endPoint);

                sendStoppedEvent.Reset();
                receiveStoppedEvent.Reset();

                startSend();
                startReceive();
            }
            catch (SocketException e)
            {
                var waitSecond = (int)Math.Pow(3, connectingAttempt);
                if (waitSecond >= MAXCONNECTATTEMPTINTERVAL)
                {
                    waitSecond = MAXCONNECTATTEMPTINTERVAL;
                    if(ErrorHandler !=null)
                        ErrorHandler(e);
                }

                Thread.Sleep(waitSecond * 1000);
                connectingAttempt++;
            }
        }
    }

    public void Send(byte[] request)
    {
        lock (sendSync)
        {
            waitingRequests.Enqueue(request);
            Monitor.Pulse(sendSync);
        }
    }

    private void Send()
    {
        while (true)
        {
            sendReadyEvent.WaitOne();
            byte[] bytes;
            lock (sendSync)
            {
                if (waitingRequests.Count == 0)
                    Monitor.Wait(sendSync)
                bytes = waitingRequests.Dequeue();
            }

            int i = 0;
            while (i < bytes.Length)
            {
                try
                {
                    i += socket.Send(bytes, i, bytes.Length - i, SocketFlags.None);
                }
                catch (SocketException e)
                {
                    stopSend();
                    break;
                }
                catch (ObjectDisposedException e)
                {
                    stopSend();
                    break;
                }
            }
        }
    }

    private void stopSend()
    {
        socket.Close();
        sendReadyEvent.Reset();
        sendStoppedEvent.Set();
    }

    private void startSend()
    {
        sendReadyEvent.Set();
    }

    private int totalReceived = 0;
    private int  recevied = 0;

    private void Receive()
    {
        int RECVBUFFERSIZE = 1024 * 1024 * 2;
        byte[] recvBuffer = new byte[RECVBUFFERSIZE];
        while (true)
        {
            receiveReadyEvent.WaitOne();
            try
            {
                recevied = socket.Receive(recvBuffer, totalReceived, 1024, SocketFlags.None);
                if (recevied == 0)
                    stopReceive();
                totalReceived += recevied;
                //parse data
                ......
            }
            catch (SocketException e)
            {
                stopReceive();
            }
            catch (ObjectDisposedException e)
            {
                stopReceive();
            }
        }
    }

    private void stopReceive()
    {
        totalReceived = 0;
        recevied = 0;

        socket.Close();
        receiveReadyEvent.Reset();
        receiveStoppedEvent.Set();
    }

    private void startReceive()
    {
        receiveReadyEvent.Set();
    }
}

这是 Wireshark 转储

编号 时间 源 目的地 协议 长度 信息

4850 1971.053146 客户端服务器 TCP 90 netview-aix-3 > autodesk-nlm [PSH, ACK] Seq=7969 Ack=188674 Win=232 Len=36

编号 时间 源 目的地 协议 长度 信息

4851 1971.077669 服务器客户端 TCP 286 [TCP 窗口已满] autodesk-nlm > netview-aix-3 [PSH, ACK] Seq=188674 Ack=7969 Win=28544 Len=232

编号 时间 源 目的地 协议 长度 信息

4852 1971.077672 服务器客户端 TCP 60 autodesk-nlm > netview-aix-3 [ACK] Seq=188906 Ack=8005 Win=28544 Len=0

编号 时间 源 目的地 协议 长度 信息

4853 1971.077766 客户端服务器 TCP 189 [TCP ZeroWindow] netview-aix-3 > autodesk-nlm [PSH, ACK] Seq=8005 Ack=188906 Win=0 Len=135

4

0 回答 0