为了工作,我需要编写一个能够使用 TCP 作为协议处理多达 8000/10000 个活动连接的服务器。
我以前使用过 IOCP,但我遇到了一个奇怪的情况,即服务器在高负载后开始丢失事件!
为了测试架构,我编写了一个简单的发送回你接收的所有客户端和服务器,它可以正常工作到 1000/2000 个客户端,但是当我开始获得更多连接时,不会调用 IOCP 回调!
服务器和客户端在我的本地机器上运行
如果我在 1ms 的发送/接收事件中间放置一个 Thread.Sleep 一切正常,但服务器可以处理的网络吞吐量会下降很多。我尝试了与 Yield 非常相似的 Thread.Yield 和 Thread.Sleep(0),但没有有用的结果。
我的开发机器是 i7-3770,有 8gb 的内存,所以它能够处理这个负载!使用 Thread.Sleep,CPU 消耗低于 30%,我可以处理多达 10000 个连接而不会丢失事件,但吞吐量为 200mbps(客户端+服务器)......当系统没有丢失事件时,没有线程。睡眠,服务器能够处理高达 8gbps 的吞吐量!
我正在使用 8192 缓冲区大小。
为了最好地优化,我的代码预初始化所有必要的读取缓冲区和读/写 SocketAsyncEventArgs(在下面的代码中有一个初始化,但我有一个超类,它扩展了下面的一个并预初始化所有必要的。
这里有一些服务器部分的代码
public bool SendAsync(byte[] Buffer, int Offset, int Length)
{
// Verifica se è connesso
if (this.IsConnected == false)
{
return false;
}
// Attende un eventuale invio in corso
this.sendAsyncAutoResetEvent.WaitOne();
if (this.sendSocketAsyncEventArgs == null)
{
// Inizializza l'oggetto contenente le informazioni per la gestione asincrona (IOCP)
// dell'invio dei dati
this.sendSocketAsyncEventArgs = new SocketAsyncEventArgs();
this.sendSocketAsyncEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(
this.SocketAsyncCallback);
}
try
{
this.sendSocketAsyncEventArgs.SetBuffer(Buffer, Offset, Length);
}
catch (ObjectDisposedException)
{
this.sendAsyncAutoResetEvent.Set();
return false;
}
catch (NullReferenceException)
{
this.sendAsyncAutoResetEvent.Set();
return false;
}
return this.SendAsyncInternal();
}
private bool SendAsyncInternal()
{
try
{
// Prova ad avviare la ricezione asincrona
if (this.socket.SendAsync(this.sendSocketAsyncEventArgs) == false)
{
// L'operazione si è completata in modo sincrono, gestisce gli eventi
this.SocketAsyncCallback(this, this.sendSocketAsyncEventArgs);
}
}
catch (SocketException e)
{
// Imposta l'eccezione
this.lastSocketException = e;
this.sendAsyncAutoResetEvent.Set();
return false;
}
catch (ObjectDisposedException)
{
this.sendAsyncAutoResetEvent.Set();
return false;
}
catch (NullReferenceException)
{
this.sendAsyncAutoResetEvent.Set();
return false;
}
return true;
}
protected void SocketAsyncCallback(object sender, SocketAsyncEventArgs e)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(this.ProcessSocketEvent), e);
}
private void ProcessSocketEvent(object State)
{
SocketAsyncEventArgs e = State as SocketAsyncEventArgs;
try
{
if
(
// Richiesta chiusura della connessione
(
e.LastOperation != SocketAsyncOperation.Connect
&&
e.SocketError == SocketError.Success
&&
e.BytesTransferred == 0
)
||
e.SocketError == SocketError.OperationAborted
||
e.SocketError == SocketError.ConnectionReset
)
{
// Termina la connessione
this.Close();
}
else if (e.SocketError == SocketError.ConnectionRefused)
{
this.ProcessSocketConnectRefused(e);
}
else
{
// Verifica la presenza di eventuali errori
if (e.SocketError != SocketError.Success)
{
throw new Exception(e.SocketError.ToString());
}
switch (e.LastOperation)
{
case SocketAsyncOperation.Connect:
this.ProcessSocketConnect(e);
break;
case SocketAsyncOperation.Receive:
this.ProcessSocketReceive(e);
break;
case SocketAsyncOperation.Send:
this.ProcessSocketSend(e);
this.sendAsyncAutoResetEvent.Set();
break;
default:
break;
}
}
}
catch (ObjectDisposedException)
{
// do nothing
}
catch (NullReferenceException)
{
// do nothing
}
}
private void ProcessSocketConnect(SocketAsyncEventArgs e)
{
this.OnConnectionEstablished();
}
private void ProcessSocketConnectRefused(SocketAsyncEventArgs e)
{
this.OnConnectionRefused();
}
private void ProcessSocketReceive(SocketAsyncEventArgs e)
{
this.lastPacketRecievedAt = DateTime.Now;
this.OnDataReceived(e.Buffer, e.Offset, e.BytesTransferred);
}
private void ProcessSocketSend(SocketAsyncEventArgs e)
{
this.lastPacketSendedAt = DateTime.Now;
this.OnDataSended(e.Buffer, e.Offset, e.BytesTransferred);
}
谢谢你的帮助!
编辑
我忘记指定操作系统是 Windows 8 64Bit