0

为了工作,我需要编写一个能够使用 TCP 作为协议处理多达 8000/10000 个活动连接的服务器。

我以前使用过 IOCP,但我遇到了一个奇怪的情况,即服务器在高负载后开始丢失事件!

为了测试架构,我编写了一个简单的发送回你接收的所有客户端和服务器,它可以正常工作到 1000/2000 个客户端,但是当我开始获得更多连接时,不会调用 IOCP 回调!

服务器和客户端在我的本地机器上运行

如果我在 1ms 的发送/接收事件中间放置一个 Thread.Sleep 一切正常,但服务器可以处理的网络吞吐量会下降很多。我尝试了与 Yield 非常相似的 Thread.Yield 和 Thread.Sleep(0),但没有有用的结果。

我的开发机器是 i7-3770,有 8gb 的内存,所以它能够处理这个负载!使用 Thread.Sleep,CPU 消耗低于 30%,我可以处理多达 10000 个连接而不会丢失事件,但吞吐量为 200mbps(客户端+服务器)......当系统没有丢失事件时,没有线程。睡眠,服务器能够处理高达 8gbps 的吞吐量!

我正在使用 8192 缓冲区大小。

为了最好地优化,我的代码预初始化所有必要的读取缓冲区和读/写 SocketAsyncEventArgs(在下面的代码中有一个初始化,但我有一个超类,它扩展了下面的一个并预初始化所有必要的。

这里有一些服务器部分的代码

    public bool SendAsync(byte[] Buffer, int Offset, int Length)
    {
        // Verifica se è connesso
        if (this.IsConnected == false)
        {
            return false;
        }

        // Attende un eventuale invio in corso
        this.sendAsyncAutoResetEvent.WaitOne();

        if (this.sendSocketAsyncEventArgs == null)
        {
            // Inizializza l'oggetto contenente le informazioni per la gestione asincrona (IOCP)
            // dell'invio dei dati
            this.sendSocketAsyncEventArgs = new SocketAsyncEventArgs();
            this.sendSocketAsyncEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(
                this.SocketAsyncCallback);
        }

        try
        {
            this.sendSocketAsyncEventArgs.SetBuffer(Buffer, Offset, Length);
        }
        catch (ObjectDisposedException)
        {
            this.sendAsyncAutoResetEvent.Set();
            return false;
        }
        catch (NullReferenceException)
        {
            this.sendAsyncAutoResetEvent.Set();
            return false;
        }

        return this.SendAsyncInternal();
    }

    private bool SendAsyncInternal()
    {
        try
        {
            // Prova ad avviare la ricezione asincrona
            if (this.socket.SendAsync(this.sendSocketAsyncEventArgs) == false)
            {
                // L'operazione si è completata in modo sincrono, gestisce gli eventi
                this.SocketAsyncCallback(this, this.sendSocketAsyncEventArgs);
            }
        }
        catch (SocketException e)
        {
            // Imposta l'eccezione
            this.lastSocketException = e;
            this.sendAsyncAutoResetEvent.Set();

            return false;
        }
        catch (ObjectDisposedException)
        {
            this.sendAsyncAutoResetEvent.Set();
            return false;
        }
        catch (NullReferenceException)
        {
            this.sendAsyncAutoResetEvent.Set();
            return false;
        }

        return true;
    }

    protected void SocketAsyncCallback(object sender, SocketAsyncEventArgs e)
    {
        ThreadPool.QueueUserWorkItem(new WaitCallback(this.ProcessSocketEvent), e);
    }

    private void ProcessSocketEvent(object State)
    {
        SocketAsyncEventArgs e = State as SocketAsyncEventArgs;

        try
        {
            if
            (
                // Richiesta chiusura della connessione
                (
                    e.LastOperation != SocketAsyncOperation.Connect
                    &&
                    e.SocketError == SocketError.Success
                    &&
                    e.BytesTransferred == 0
                )
                ||
                e.SocketError == SocketError.OperationAborted
                ||
                e.SocketError == SocketError.ConnectionReset
            )
            {
                // Termina la connessione
                this.Close();
            }
            else if (e.SocketError == SocketError.ConnectionRefused)
            {
                this.ProcessSocketConnectRefused(e);
            }
            else
            {
                // Verifica la presenza di eventuali errori
                if (e.SocketError != SocketError.Success)
                {
                    throw new Exception(e.SocketError.ToString());
                }

                switch (e.LastOperation)
                {
                    case SocketAsyncOperation.Connect:

                        this.ProcessSocketConnect(e);
                        break;

                    case SocketAsyncOperation.Receive:

                        this.ProcessSocketReceive(e);
                        break;

                    case SocketAsyncOperation.Send:

                        this.ProcessSocketSend(e);
                        this.sendAsyncAutoResetEvent.Set();
                        break;

                    default:
                        break;
                }
            }
        }
        catch (ObjectDisposedException)
        {
            // do nothing
        }
        catch (NullReferenceException)
        {
            // do nothing
        }
    }

    private void ProcessSocketConnect(SocketAsyncEventArgs e)
    {
        this.OnConnectionEstablished();
    }

    private void ProcessSocketConnectRefused(SocketAsyncEventArgs e)
    {
        this.OnConnectionRefused();
    }

    private void ProcessSocketReceive(SocketAsyncEventArgs e)
    {
        this.lastPacketRecievedAt = DateTime.Now;
        this.OnDataReceived(e.Buffer, e.Offset, e.BytesTransferred);
    }

    private void ProcessSocketSend(SocketAsyncEventArgs e)
    {
        this.lastPacketSendedAt = DateTime.Now;
        this.OnDataSended(e.Buffer, e.Offset, e.BytesTransferred);
    }

谢谢你的帮助!

编辑

我忘记指定操作系统是 Windows 8 64Bit

4

0 回答 0