基本上,我正在为一个设计得非常快速、小巧且健壮的系统编写一些代码。我从一些异步示例开始,TcpListener
并TcpClient
编写了一个服务器和客户端类,该类基本上在我的项目中的多个地方使用。
基本上我的服务器类(代码将在后面)都是基于事件的,客户端代码也是如此。当我收到数据包 - 一个接一个 - 通过服务器或客户端套接字进入时 - 一切正常。
但是,如果发送方(例如 A 类使用客户端类)通过 TCP 流将一堆数据包发送到 B 类中的服务器类。当然,服务器类可能会将所有数据包作为一个大的大包来获取。所以当数据接收事件的回调发生时,我抓取缓冲区然后处理它。
这就是有趣的事情发生的地方。我的问题不是将所有数据包从大缓冲区中拆分出来。我的问题是由于某种我无法理解的原因。假设我从客户端向服务器发送了 5 个数据包(反之亦然),而另一方得到了所有 5 个数据包。触发了 datarecieve 事件,然后抓住了窃听器,所有 5 个数据包数据包在那里。它们被处理。但随后事件再次触发..
换句话说,不是事件触发一次,而是针对 5 个单独的数据包触发 5 次,我最终处理了包含这 5 个数据包的缓冲区 5 次。
由于我正在设计一个分布式网络,这意味着模块与之通信的节点(模块(客户端类)<-->节点(服务器类)<-->客户端(客户端类))获得25个数据包而不是5个. 然后它将这些转发到目的地,得到 25*5 或 125 个数据包。
我很确定我在这里遗漏了一些明显的东西。我已经尝试过让事件只触发一次的方法。我最终可能会认输并重写服务器和客户端类,以便它们是同步的并且每个客户端实例(或服务器端)都有一个线程,一个接受的线程,以及每个客户端连接一个线程)——这样我就可以更好地处理数据流。即数据包进来,如果它的整个过程。如果不是,请等待它完整,等等。使用典型的开始/结束特殊字节等等。
服务器类 - 大部分都在那里。去掉了一些不相关的,比如 KillClient 等。
public class Server
{
private TcpListener serverListener;
private List<ServerClient> clients;
#region Callbacks
public delegate void incomingDataCallback(byte[] buffer, string clientID, TcpClient tcpClient);
public incomingDataCallback incomingData = null;
public delegate void incomingConnectionCallback(string clientID, TcpClient tcpClient);
public incomingConnectionCallback incomingConnection = null;
public delegate void connectionClosedCallback(string clientID, TcpClient tcpClient);
public connectionClosedCallback connectionClosed = null;
public delegate void dataWrittenCallback(string clientID, TcpClient tcpClient);
public dataWrittenCallback dataWritten = null;
#endregion
// Constructor
public Server(string listenIP, int listenPort)
{
// Create a new instance of serverlistener.
serverListener = new TcpListener(IPAddress.Parse(listenIP), listenPort);
this.clients = new List<ServerClient>();
this.Encoding = Encoding.Default;
}
~Server()
{
// Shut down the server.
this.Stop();
}
public Encoding Encoding { get; set; }
public IEnumerable<TcpClient> TcpClients
{
get
{
foreach (ServerClient client in this.clients)
{
yield return client.TcpClient;
}
}
}
public IEnumerable<TcpClient> TcpClients
{
get
{
foreach (ServerClient client in this.clients)
{
yield return client.TcpClient;
}
}
}
public void Stop()
{
this.serverListener.Stop();
lock (this.clients)
{
foreach (ServerClient client in this.clients)
{
client.TcpClient.Client.Disconnect(false);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
}
this.clients.Clear();
}
}
public void WriteToClient(TcpClient tcpClient, byte[] bytes)
{
NetworkStream networkStream = tcpClient.GetStream();
try
{
networkStream.BeginWrite(bytes, 0, bytes.Length, WriteCallback, tcpClient);
}
catch (System.IO.IOException ex)
{
// Port was closed before data could be written.
// So remove this guy from clients.
lock (this.clients)
{
foreach (ServerClient cl in clients)
{
if (cl.TcpClient.Equals(tcpClient))
{
this.clients.Remove(cl);
if (connectionClosed != null)
connectionClosed(cl.ID, cl.TcpClient);
break;
}
}
}
}
}
private void WriteCallback(IAsyncResult result)
{
TcpClient tcpClient = result.AsyncState as TcpClient;
NetworkStream networkStream = tcpClient.GetStream();
networkStream.EndWrite(result);
// Get the ID and return it
//ServerClient client = result.AsyncState as ServerClient;
//string ipaddr = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Address.ToString();
string port = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Port.ToString();
Console.WriteLine("Write callback called for: " + port);
// if (dataWritten != null)
// dataWritten(client.ID, tcpClient);
}
private void AcceptTcpClientCallback(IAsyncResult result)
{
TcpClient tcpClient;
try
{
tcpClient = serverListener.EndAcceptTcpClient(result);
}
catch
{
// Often get this error when shutting down the server
return;
}
NetworkStream networkStream = tcpClient.GetStream();
byte[] buffer = new byte[tcpClient.ReceiveBufferSize];
// Get the IP Address.. this will be used for id purposes.
string ipaddr = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Address.ToString();
string port = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Port.ToString();
// Create a client object for this client.
ServerClient client = new ServerClient(tcpClient, buffer, ipaddr + ":" + port);
Console.WriteLine("Data availiable: " + networkStream.DataAvailable.ToString());
Console.WriteLine("Amount of data: " + tcpClient.Available.ToString());
// Lock the list and add it in.
lock (this.clients)
{
this.clients.Add(client);
}
if (networkStream.DataAvailable)
{
int read = networkStream.Read(client.Buffer, 0, client.Buffer.Length);
Console.WriteLine("Calling ReadHandle directly with " + read.ToString() + " number of bytes. for clientid: " + client.ID);
ReadHandle(client, read, networkStream);
}
else
{
Console.WriteLine("Started beginRead for client in accept connection: " + client.ID);
networkStream.BeginRead(client.Buffer, 0, client.Buffer.Length, ReadCallback, client);
//networkStream.
Console.WriteLine("Data availiable: " + networkStream.DataAvailable.ToString());
Console.WriteLine("Amount of data: " + tcpClient.Available.ToString());
}
Console.WriteLine("Starting BeginAcceptTcpClient again - client: " + client.ID);
serverListener.BeginAcceptTcpClient(AcceptTcpClientCallback, null);
// Notify owner that new connection came in
if (incomingConnection != null)
incomingConnection(client.ID, tcpClient);
}
private void ReadCallback(IAsyncResult result)
{
ServerClient client = result.AsyncState as ServerClient;
if (client == null)
{
Console.WriteLine("ReadCallback: Null client");
return;
}
int read = 0;
NetworkStream networkStream = client.NetworkStream;
try
{
read = networkStream.EndRead(result);
}
catch (System.IO.IOException ex)
{
Console.WriteLine("ReadCallback: Exception occured during reading.. Message: " + ex.Message + " client " + client.ID);
lock (this.clients)
{
this.clients.Remove(client);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
return;
}
}
ReadHandle(client, read, networkStream);
}
private void ReadHandle(ServerClient client, int read, NetworkStream networkStream)
{
// If zero bytes read, then client disconnected.
if (read == 0)
{
Console.WriteLine("ReadHandle: Read == 0, closing connection for Client: " + client.ID);
lock (this.clients)
{
this.clients.Remove(client);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
return;
}
}
//string data = this.Encoding.GetString(client.Buffer, 0, read);
// Do something with the data object here.
if (incomingData != null)
incomingData(client.Buffer, client.ID, client.TcpClient);
// Go back to accepting data from client.
try
{
networkStream.BeginRead(client.Buffer, 0, client.Buffer.Length, ReadCallback, client);
Console.WriteLine("ReadHandle: BeginRead called for client " + client.ID);
}
catch (Exception ex)
{
// Damn, we just lost the client.
Console.WriteLine("ReadHandle: Exception occured during trying to BeginRead.. Message: " + ex.Message + " client " + client.ID);
lock (this.clients)
{
this.clients.Remove(client);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
return;
}
}
}
}
internal class ServerClient
{
public ServerClient(TcpClient tcpClient, byte[] buffer, string ipaddr)
{
if (tcpClient == null) throw new ArgumentNullException("tcpClient");
if (buffer == null) throw new ArgumentNullException("tcpClient");
if (ipaddr == null) throw new ArgumentNullException("tcpClient");
this.TcpClient = tcpClient;
this.Buffer = buffer;
this.ID = ipaddr;
}
public TcpClient TcpClient { get; private set; }
public byte[] Buffer { get; private set; }
public string ID { get; private set; }
public NetworkStream NetworkStream
{
get
{
return TcpClient.GetStream();
}
}
}
}
这是客户端类 - 与服务器相比,它更小更简单。
public class Client
{
private IPAddress address;
private int port;
private string ID;
//private WaitHandle addressSet;
private TcpClient tcpClient;
private int failedConnectionCount;
public bool keepOnTrying = false;
#region Callbacks
public delegate void incomingDataCallback(byte[] buffer, string serverID);
public incomingDataCallback incomingData = null;
public delegate void connectedCallback(string serverID);
public connectedCallback clientConnected = null;
public delegate void connectionFailedCallback(string serverID);
public connectionFailedCallback clientConnectionFailed = null;
public delegate void connectionClosedCallback(string serverID);
public connectionClosedCallback connectionClosed = null;
public delegate void dataWrittenCallback(string serverID);
public dataWrittenCallback dataWritten = null;
#endregion
public Client(IPAddress address, int port)
{
this.address = address;
if (port < 0) throw new ArgumentException();
this.port = port;
this.tcpClient = new TcpClient();
this.Encoding = Encoding.Default;
this.ID = address.ToString() + ":" + port.ToString();
tcpClient.ReceiveBufferSize = 16384;
tcpClient.SendBufferSize = 16384;
}
// Destructor
~Client()
{
this.Disconnect();
}
public Encoding Encoding { get; set; }
public void Connect()
{
tcpClient.BeginConnect(address, port, ConnectCallback, null);
}
public void Disconnect()
{
tcpClient.Close();
if (connectionClosed != null)
connectionClosed(ID);
}
public void Write(byte[] bytes)
{
NetworkStream networkStream = tcpClient.GetStream();
networkStream.BeginWrite(bytes, 0, bytes.Length, WriteCallback, null);
}
private void WriteCallback(IAsyncResult result)
{
NetworkStream networkStream = tcpClient.GetStream();
if (tcpClient.Connected)
{
networkStream.EndWrite(result);
}
if (dataWritten != null)
dataWritten(ID);
}
private void ConnectCallback(IAsyncResult result)
{
// Check to see if connected successfully or not. If we didnt, then the try/catch block will increment
// the failed connection count.
try
{
tcpClient.EndConnect(result);
}
catch
{
Interlocked.Increment(ref failedConnectionCount);
if (keepOnTrying)
tcpClient.BeginConnect(address, port, ConnectCallback, null);
if (clientConnectionFailed != null)
clientConnectionFailed(ID);
return;
}
// Connected successfully.
// Now begin async read operation.
NetworkStream networkStream = tcpClient.GetStream();
byte[] buffer = new byte[tcpClient.ReceiveBufferSize];
networkStream.BeginRead(buffer, 0, buffer.Length, ReadCallback, buffer);
if (clientConnected != null)
clientConnected(ID);
}
private void ReadCallback(IAsyncResult result)
{
int read;
NetworkStream networkStream;
try
{
networkStream = tcpClient.GetStream();
read = networkStream.EndRead(result);
}
catch
{
// An error has occured when reading.. -.-
Console.WriteLine("Error occured while reading for ID: " + ID);
return;
}
// If read is 0, then connection was closed
if (read == 0)
{
if (connectionClosed != null)
connectionClosed(ID);
return;
}
if (result.IsCompleted == false)
{
Console.WriteLine("Uh oh ");
}
byte[] buffer = result.AsyncState as byte[];
if (incomingData != null)
incomingData(buffer, ID);
// Then begin reading again.
networkStream.BeginRead(buffer, 0, buffer.Length, ReadCallback, buffer);
}
}
我使用这些类的方式是这样的:
- 创建一个类,然后创建一个服务器或客户端的对象。
- 绑定所有回调。即在你的类中为每个回调创建函数。
- 调用服务器启动或客户端连接。取决于您使用的是哪个。
因此,要复制我的问题,请执行以下操作:
- 在一个程序中创建服务器类,在另一个程序中创建客户端。让客户端连接到服务器。
- 对传入的数据进行回调。我使用序列化,所以你可以做类似的事情。
- 让客户端一次向服务器发送一堆数据。对我来说,我在模块中将 JSON 数据转换为我自己的格式,然后将其发送到服务器。所以服务器一次得到一堆数据包。
- 应该看到 - 如果速度足够快 - 服务器会将所有数据包放入接收缓冲区,并且每次调用incomingDataCallback 时 - 您将拥有一个包含所有数据包的缓冲区。它会为收到的每个数据包调用它。不是字节,整个数据包。
因此,在我将代码重写为同步并在线程中运行之前:
- 有什么我可以做的不同/更好的事情,以便在数据进入时做到这一点 - 要么它调用一次事件,我就可以处理缓冲区中的所有数据包 - 或者 -
- 有没有办法确保被调用的任何其他事件不会与初始事件共享相同的缓冲区?我知道这是浪费处理器时间——但我可以在我的incomingDataCallback 处理程序中有一个“如果前10 个字节是00,则返回”行。这就是为什么我想在第一个事件中使缓冲区全部无效并在后续事件中检测这些缓冲区。
更新:由于 Servy 的评论 - 这是我使用这些类的方式。不是 c/p'ing 一切,只是相关部分。
节点 - 使用服务器类。
class ModuleClient
{
private List<ModuleClientInfo> clients = new List<ModuleClientInfo>();
private Server myServer = null;
public ModuleClient()
{
// create a server object
myServer = new Server("127.0.0.1", 9000);
// Attach callbacks
myServer.connectionClosed = connClosed;
myServer.dataWritten = dataWritten;
myServer.incomingConnection = incomingConn;
myServer.incomingData = incomingData;
}
public void startListeningForModules()
{
if (!listeningForModules)
myServer.Start();
else
return;
listeningForModules = true;
}
private void incomingData(byte[] buffer, string clientID, TcpClient tcpClient)
{
Console.WriteLine("Incoming Data from " + clientID);
incomingPacketStruct newPacket = new incomingPacketStruct();
newPacket.clientID = clientID;
newPacket.buffer = buffer;
newPacket.tcpClient = tcpClient;
}
在incomingData 中,我注意到缓冲区中有5 个数据包,然后incomingData 被调用了5 次。
现在至于客户端的incomingData(请记住,我没有注意到传出数据中的这种行为,也没有相关性。假设我一次得到10个json数据包,我会将它们发送到节点 - 所以这是10次写入。节点会将它们全部放在同一个缓冲区中,然后它将调用服务器的传入数据 10 次,每次都会看到 10 个数据包。
客户端传入数据:
public partial class Program : ServiceBase
{
// Globals
private static SocketObject.Client myHermesClient = null;
private static JSONInterface myJsonInterface = null;
private static void mainThread(object data)
{
// Take care of client and callbacks..
myHermesClient = new SocketObject.Client(System.Net.IPAddress.Parse("127.0.0.1"), 9000);
myHermesClient.connectionClosed = hermesConnectionClosed;
myHermesClient.clientConnected = hermesConnected;
myHermesClient.dataWritten = hermesDataWritten;
myHermesClient.incomingData = hermesIncomingData;
myHermesClient.clientConnectionFailed = hermesConnectionFailed;
myHermesClient.keepOnTrying = true;
// Begin async connect
myHermesClient.Connect();
// Main loop for service.
while (serviceRunning)
{
Thread.Sleep(500);
}
}
#region Hermes Client Code
private static void hermesIncomingData(byte[] buffer, string serverID)
{
}
再说一次,同样的事情。当服务器将大量数据发送回客户端时。如果你打破并查看缓冲区,你会明白我在说什么。
现在,要澄清这一点。我的问题不是分解数据包。我有代码(不包括在内,因为专有,与此无关 - 它不修改缓冲区,仅从中创建对象列表) - 但问题是回调被多次调用,如上所述。