TCP 服务器是使用 SocketAsyncEventArgs 开发的,它是作为 Windows 服务的异步方法。我在 Main 的开头有这 2 行代码:
ThreadPool.SetMaxThreads(15000, 30000);
ThreadPool.SetMinThreads(10000, 20000);
并且两者都返回 true(记录返回值)。现在有 2000 到 3000 个客户端开始向该服务器发送消息,并且它开始接受连接(我计算了连接数,它符合预期 - 有一个连接池)。服务器进程的线程数将增加到 ~2050 到 ~3050。到现在为止还挺好!
现在有一个 Received 方法,该方法将在 ReceiveAsync 返回 true 后或通过 SocketAsyncEventArgs 的 Completed 事件调用。
问题就从这里开始了:无论连接了多少客户端,发送了多少消息,Received 最多在一秒钟内被调用 20 次!随着客户数量的增加,这个数字(20)下降到约 10。
环境:在同一台机器上模拟 TCP Server 和客户端。我在 2 台机器上测试了代码,一台有 2 核 CPU 和 4GB RAM,另一台有 8 核 CPU 和 12GB RAM。没有数据丢失(还),有时我在每个接收操作中收到超过 1 条消息。没关系。但是如何才能增加接收操作的数量呢?
关于实现的附加说明:代码很大,包含许多不同的逻辑。总体描述是:我有一个 SocketAsyncEventArgs 用于接受新连接。它工作得很好。现在,对于每个新接受的连接,我都会创建一个新的 SocketAsyncEventArgs 来接收数据。我把这个(为接收创建的 SocketAsyncEventArgs)放在一个池中。它不会被重用,但它的 UserToken 被用于跟踪连接;例如,那些断开的连接或那些 7 分钟没有发送任何数据的连接将被关闭和处置(SocketAsyncEventArgs 的 AcceptSocket 将被关闭(两者)、关闭和处置,SocketAsyncEventArgs 对象本身也是如此)。
class Sudo
{
Socket _listener;
int _port = 8797;
public Sudo()
{
var ipEndPoint = new IPEndPoint(IPAddress.Any, _port);
_listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_listener.Bind(ipEndPoint);
_listener.Listen(100);
Accept(null);
}
void Accept(SocketAsyncEventArgs acceptEventArg)
{
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += AcceptCompleted;
}
else acceptEventArg.AcceptSocket = null;
bool willRaiseEvent = _listener.AcceptAsync(acceptEventArg); ;
if (!willRaiseEvent) Accepted(acceptEventArg);
}
void AcceptCompleted(object sender, SocketAsyncEventArgs e)
{
Accepted(e);
}
void Accepted(SocketAsyncEventArgs e)
{
var acceptSocket = e.AcceptSocket;
var readEventArgs = CreateArg(acceptSocket);
var willRaiseEvent = acceptSocket.ReceiveAsync(readEventArgs);
Accept(e);
if (!willRaiseEvent) Received(readEventArgs);
}
SocketAsyncEventArgs CreateArg(Socket acceptSocket)
{
var arg = new SocketAsyncEventArgs();
arg.Completed += IOCompleted;
var buffer = new byte[64 * 1024];
arg.SetBuffer(buffer, 0, buffer.Length);
arg.AcceptSocket = acceptSocket;
arg.SocketFlags = SocketFlags.None;
return arg;
}
void IOCompleted(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
Received(e);
break;
default: break;
}
}
void Received(SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success || e.BytesTransferred == 0 || e.Buffer == null || e.Buffer.Length == 0)
{
// Kill(e);
return;
}
var bytesList = new List<byte>();
for (var i = 0; i < e.BytesTransferred; i++) bytesList.Add(e.Buffer[i]);
var bytes = bytesList.ToArray();
Process(bytes);
ReceiveRest(e);
Perf.IncOp();
}
void ReceiveRest(SocketAsyncEventArgs e)
{
e.SocketFlags = SocketFlags.None;
for (int i = 0; i < e.Buffer.Length; i++) e.Buffer[i] = 0;
e.SetBuffer(0, e.Buffer.Length);
var willRaiseEvent = e.AcceptSocket.ReceiveAsync(e);
if (!willRaiseEvent) Received(e);
}
void Process(byte[] bytes) { }
}