我正在构建一个网络库以用于进一步的项目,我知道一点网络(一般和编程),但是当我想高频传输更大的数据包时我仍然遇到问题。这在 TCP 中怎么可能实现?一种可能的解决方案是分块,这根本不是问题,我只需将它们放在缓冲区中并再次获取整个数据包。这会导致服务器开销,处理大量数据会消耗大量资源。它会影响稳定性,因为服务器卡在某个地方并且客户端在 BeginSend 抛出 ObjectDisposedException (连接已关闭)。当然这可能是我的错,但考虑一个 2048 字节(2 kB)的“相对”小数据包,它被分块为 512 个字节(4 个块):
//main.cs
static void Main(string[] args)
{
client = new Client("82.72.201.150", 2345);
client.SendCompleted += OnSend;
Console.WriteLine(client.Connect());
var bytes = new byte[2048];
for (int i = 0; i < bytes.Length; i++)
{
bytes[i] = (byte)((i*i + 0x25)%byte.MaxValue);
}
while (true)
{
SendChuncked(bytes);
Thread.Sleep(500);
}
}
static void SendChuncked(byte[] buffer, int chunksize = 512)
{
var chunks = buffer.Length/chunksize;
var rest = buffer.Length%chunksize;
var size = BitConverter.GetBytes(buffer.Length);
client.Send(size, 0, size.Length);
for (int i = 0; i < chunks; i++)
{
client.Send(buffer, i * chunksize, chunksize);
}
if (rest > 0)
client.Send(buffer, chunks * chunksize, rest);
}
//....
//client.cs
public struct TransferState
{
public byte[] buffer;
public int offset;
public int count;
public int handled;
public ManualResetEvent waitHandle;
public bool wait;
}
public void Send(byte[] buffer, int offset, int count)
{
var size = BitConverter.GetBytes(count);
Send(size, 0, size.Length, true);
Send(buffer, offset, count, false);
}
private void Send(byte[] buffer, int offset, int count, bool wait, TransferState? state = null)
{
state = state ?? new TransferState
{
buffer = buffer,
offset = offset,
count = count,
handled = 0,
wait = wait,
waitHandle = new ManualResetEvent(false)
};
socket.BeginSend(buffer, offset, count, SocketFlags.None, SendCallback, state);
if (wait)
{
((TransferState) state).waitHandle.WaitOne();
}
}
private void Send(byte[] buffer, int offset, int count, bool wait, TransferState? state = null)
{
state = state ?? new TransferState
{
buffer = buffer,
offset = offset,
count = count,
handled = 0,
wait = wait,
waitHandle = new ManualResetEvent(false)
};
socket.BeginSend(buffer, offset, count, SocketFlags.None, SendCallback, state);
if (wait)
{
((TransferState) state).waitHandle.WaitOne();
}
}
private void SendCallback(IAsyncResult result)
{
if (result.AsyncState is TransferState == false)
throw new ArgumentException("Invalid type of state.", "state");
var state = (TransferState)result.AsyncState;
var sent = socket.EndSend(result);
state.handled += sent;
var tosent = state.count - state.handled;
var offset = state.offset + state.handled;
if (tosent > 0)
{
Send(state.buffer, offset, tosent, state.wait, state);
}
else
{
state.waitHandle.Set();
SendCompleted(this, new TransferCompletedArgs(this, state));
}
}
//....
public void Receive(TransferState? state = null)
{
if (state == null)
{
var buffer = new byte[sizeof(int)];
socket.BeginReceive(buffer, 0, sizeof (int), SocketFlags.None, ReceiveCallback, buffer);
}
else
{
var transferState = state.Value;
socket.BeginReceive(transferState.buffer, transferState.offset, transferState.count - transferState.handled, SocketFlags.None, ReceiveCallback, transferState);
}
}
private void ReceiveCallback(IAsyncResult result)
{
//receiving the amount to receive
if (result.AsyncState is byte[])
{
var buffer = (byte[])result.AsyncState;
var rec = socket.EndReceive(result);
if (rec != 4) //TODO: HANDLE MORE PROPERLY
throw new NotImplementedException("Error while receiving the amoount to receive.");
var toreceive = BitConverter.ToInt32(buffer, 0);
var state = new TransferState
{
buffer = new byte[toreceive],
count = toreceive,
wait = false
};
Receive(state);
}
//know we know the amount we can receive it till the end
else if (result.AsyncState is TransferState)
{
var state = (TransferState)result.AsyncState;
var rec = socket.EndReceive(result);
state.offset += rec;
state.handled += rec;
var toreceive = state.count - state.handled;
if (toreceive > 0)
{
Receive(state);
Debug.WriteLine("[{2}] size mismatch: {0}/{1}", state.handled, state.count, DateTime.Now.ToString("mm:ss.fff"));
}
else
{
ReceiveCompleted(this, new TransferCompletedArgs(this, state));
Receive();
}
}
else
{
throw new ArgumentException("State is not typeof byte[] or TransferState.");
}
}
因此,如果很难理解:
- 客户端尝试与服务器连接
- 服务器接受客户端
- 客户端先发送缓冲区多长时间
- 客户端开始发送缓冲区(并循环此)
- 服务器读取缓冲区的长度并为其分配缓冲区
- 服务器读取直到缓冲区被读取并调用事件(并循环此过程)
所以我不知道实际发生了什么,但服务器停止接收。虽然连接的客户端仍在发送数据,但几秒钟后ObjectDisposedException
会抛出BeginSend
. 我将如何解决这种情况?(并且拥有可以处理大量流量的稳定服务器。)