为什么不使用异步套接字?这是一些代码:
// Server socket
class ControllerSocket : Socket, IDisposable
{
// MessageQueue queues messages to be processed by the controller
public Queue<MessageBase> messageQueue = null;
// This is a list of all attached clients
public List<Socket> clients = new List<Socket>();
#region Events
// Event definitions handled in the controller
public delegate void SocketConnectedHandler(Socket socket);
public event SocketConnectedHandler SocketConnected;
public delegate void DataRecievedHandler(Socket socket, int bytesRead);
public event DataRecievedHandler DataRecieved;
public delegate void DataSentHandler(Socket socket, int length);
public event DataSentHandler DataSent;
public delegate void SocketDisconnectedHandler();
public event SocketDisconnectedHandler SocketDisconnected;
#endregion
#region Constructor
public ControllerSocket(int port)
: base(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
{
// Create the message queue
this.messageQueue = new Queue<MessageBase>();
// Acquire the host address and port, then bind the server socket
IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
IPAddress ipAddress = ipHostInfo.AddressList[0];
IPEndPoint localEndPoint = new IPEndPoint(ipAddress, port);
this.Bind(localEndPoint);
}
#endregion
// Starts accepting client connections
public void StartListening()
{
this.Listen(100);
this.BeginAccept(AcceptCallback, this);
}
// Connects to a client
private void AcceptCallback(IAsyncResult ar)
{
Socket listener = (Socket)ar.AsyncState;
Socket socket = listener.EndAccept(ar);
try
{
// Add the connected client to the list
clients.Add(socket);
// Notify any event handlers
if (SocketConnected != null)
SocketConnected(socket);
// Create an initial state object to hold buffer and socket details
StateObject state = new StateObject();
state.workSocket = socket;
state.BufferSize = 8192;
// Begin asynchronous read
socket.BeginReceive(state.Buffer, 0, state.BufferSize, 0,
ReadCallback, state);
}
catch (SocketException)
{
HandleClientDisconnect(socket);
}
finally
{
// Listen for more client connections
StartListening();
}
}
// Read data from the client
private void ReadCallback(IAsyncResult ar)
{
StateObject state = (StateObject)ar.AsyncState;
Socket socket = state.workSocket;
try
{
if (socket.Connected)
{
// Read the socket
int bytesRead = socket.EndReceive(ar);
// Deserialize objects
foreach (MessageBase msg in MessageBase.Receive(socket, bytesRead, state))
{
// Add objects to the message queue
lock (this.messageQueue)
messageQueue.Enqueue(msg);
}
// Notify any event handlers
if (DataRecieved != null)
DataRecieved(socket, bytesRead);
// Asynchronously read more client data
socket.BeginReceive(state.Buffer, state.readOffset, state.BufferSize - state.readOffset, 0,
ReadCallback, state);
}
else
{
HandleClientDisconnect(socket);
}
}
catch (SocketException)
{
HandleClientDisconnect(socket);
}
}
// Send data to a specific client
public void Send(Socket socket, MessageBase msg)
{
try
{
// Serialize the message
byte[] bytes = msg.Serialize();
if (socket.Connected)
{
// Send the message
socket.BeginSend(bytes, 0, bytes.Length, 0, SendCallback, socket);
}
else
{
HandleClientDisconnect(socket);
}
}
catch (SocketException)
{
HandleClientDisconnect(socket);
}
}
// Broadcast data to all clients
public void Broadcast(MessageBase msg)
{
try
{
// Serialize the message
byte[] bytes = msg.Serialize();
// Process all clients
foreach (Socket client in clients)
{
try
{
// Send the message
if (client.Connected)
{
client.BeginSend(bytes, 0, bytes.Length, 0, SendCallback, client);
}
else
{
HandleClientDisconnect(client);
}
}
catch (SocketException)
{
HandleClientDisconnect(client);
}
}
}
catch (Exception e)
{
// Serialization error
Console.WriteLine(e.ToString());
}
}
// Data sent to a client socket
private void SendCallback(IAsyncResult ar)
{
try
{
Socket socket = (Socket)ar.AsyncState;
if (socket.Connected)
{
// Complete sending the data
int bytesSent = socket.EndSend(ar);
// Notify any attached handlers
if (DataSent != null)
DataSent(socket, bytesSent);
}
else
{
HandleClientDisconnect(socket);
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
private void HandleClientDisconnect(Socket client)
{
// Client socket may have shutdown unexpectedly
if (client.Connected)
client.Shutdown(SocketShutdown.Both);
// Close the socket and remove it from the list
client.Close();
clients.Remove(client);
// Notify any event handlers
if (SocketDisconnected != null)
SocketDisconnected();
}
// Close all client connections
public void Dispose()
{
foreach (Socket client in clients)
{
if (client.Connected)
client.Shutdown(SocketShutdown.Receive);
client.Close();
}
}
}
// Client socket
class ReceiverSocket : Socket
{
// MessageQueue queues messages to be processed by the controller
public Queue<MessageBase> messageQueue = null;
#region Events
// Event definitions handled in the controller
public delegate void SocketConnectedHandler(Socket socket);
public event SocketConnectedHandler SocketConnected;
public delegate void DataRecievedHandler(Socket socket, MessageBase msg);
public event DataRecievedHandler DataRecieved;
public delegate void DataSentHandler(Socket socket, int length);
public event DataSentHandler DataSent;
public delegate void SocketDisconnectedHandler();
public event SocketDisconnectedHandler SocketDisconnected;
private IPEndPoint remoteEP = null;
#endregion
#region Constructor
public ReceiverSocket(int port)
: base(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
{
// Create the message queue
messageQueue = new Queue<MessageBase>();
// Acquire the host address and port
IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
IPAddress ipAddress = ipHostInfo.AddressList[0];
remoteEP = new IPEndPoint(ipAddress, port);
}
#endregion
// Connect to the server
public void Connect()
{
this.BeginConnect(remoteEP, ConnectCallback, this);
}
// Server connected
private void ConnectCallback(IAsyncResult ar)
{
// Console.WriteLine("Connect Callback");
try
{
Socket client = (Socket)ar.AsyncState;
if (client.Connected)
{
client.EndConnect(ar);
// Console.WriteLine("Connect Callback - Connected");
// Create an initial state object to hold buffer and socket details
StateObject state = new StateObject();
state.workSocket = client;
state.BufferSize = 8192;
// Notify any event handlers
if (SocketConnected != null)
SocketConnected(client);
// Begin asynchronous read
client.BeginReceive(state.Buffer, state.readOffset, state.BufferSize - state.readOffset, 0,
ReceiveCallback, state);
}
else
{
// Console.WriteLine("Connect Callback - Reconnect");
Thread.Sleep(5000);
Connect();
}
}
catch (Exception ex)
{
// Attempt server reconnect
Reconnect();
}
}
// Read data from the server
private void ReceiveCallback(IAsyncResult ar)
{
try
{
StateObject state = (StateObject)ar.AsyncState;
Socket client = state.workSocket;
// Read the socket
if (client.Connected)
{
int bytesRead = client.EndReceive(ar);
// Deserialize objects
foreach (MessageBase msg in MessageBase.Receive(client, bytesRead, state))
{
// Add objects to the message queue
lock (this.messageQueue)
this.messageQueue.Enqueue(msg);
}
// Notify any event handlers
if (DataRecieved != null)
DataRecieved(client, null);
// Asynchronously read more server data
client.BeginReceive(state.Buffer, state.readOffset, state.BufferSize - state.readOffset, 0,
ReceiveCallback, state);
}
else
{
Reconnect();
}
}
catch (SocketException)
{
// Attempt server reconnect
Reconnect();
}
}
public void Send(MessageBase msg)
{
try
{
// Serialize the message
byte[] bytes = msg.Serialize();
if (this.Connected)
{
// Send the message
this.BeginSend(bytes, 0, bytes.Length, 0, SendCallback, this);
}
else
{
Reconnect();
}
}
catch (SocketException sox)
{
// Attempt server reconnect
Reconnect();
}
catch (Exception ex)
{
int i = 0;
}
}
private void SendCallback(IAsyncResult ar)
{
try
{
Socket client = (Socket)ar.AsyncState;
if (client.Connected)
{
// Complete sending the data
int bytesSent = client.EndSend(ar);
// Notify any event handlers
if (DataSent != null)
DataSent(client, bytesSent);
}
else
{
Reconnect();
}
}
catch (SocketException)
{
Reconnect();
}
}
// Attempt to reconnect to the server
private void Reconnect()
{
try
{
// Disconnect the original socket
if (this.Connected)
this.Disconnect(true);
this.Close();
// Notify any event handlers
if (SocketDisconnected != null)
SocketDisconnected();
}
catch (Exception e)
{
// Console.WriteLine(e.ToString());
}
}
}
// Encapsulates information about the socket and data buffer
public class StateObject
{
public Socket workSocket = null;
public int readOffset = 0;
public StringBuilder sb = new StringBuilder();
private int bufferSize = 0;
public int BufferSize
{
set
{
this.bufferSize = value;
buffer = new byte[this.bufferSize];
}
get { return this.bufferSize; }
}
private byte[] buffer = null;
public byte[] Buffer
{
get { return this.buffer; }
}
}
您需要做的就是插入您自己的消息。
请记住,套接字流可能(并且大多数情况下)包含部分消息。因此,最好将消息的长度作为消息的第一个字节发送。您还必须通过在读取之间组装部分消息来相应地管理读取缓冲区。查看以下消息基类。
public partial class MessageBase
{
// Virtual Execute method following the Command pattern
public virtual string Execute(Socket socket) { return string.Empty; }
protected virtual bool MustEncrypt
{
get { return false; }
}
// Binary serialization
public byte[] Serialize()
{
using (MemoryStream stream = new MemoryStream())
{
using (DeflateStream ds = new DeflateStream(stream, CompressionMode.Compress, true))
{
BinaryFormatter formatter = new BinaryFormatter();
formatter.Serialize(ds, this);
ds.Flush();
}
byte[] bytes = stream.GetBuffer();
byte[] bytes2 = new byte[stream.Length];
Buffer.BlockCopy(bytes, 0, bytes2, 0, (int)stream.Length);
// Array.Copy(bytes, bytes2, stream.Length);
if (this.MustEncrypt)
bytes2 = RijndaelEncrptor.Instance.Encrypt(bytes2);
// Create a buffer large enough to hold the encrypted message and size bytes
byte[] data = new byte[8 + bytes2.Length];
// Add the message size
BitConverter.GetBytes(bytes2.Length).CopyTo(data, 0);
BitConverter.GetBytes(this.MustEncrypt).CopyTo(data, 4);
// Add the message data
bytes2.CopyTo(data, 8);
return data;
}
}
static public MessageBase Deserialize(byte[] buffer)
{
int length = BitConverter.ToInt32(buffer, 0);
bool mustDecrypt = BitConverter.ToBoolean(buffer, 4);
MessageBase b = null;
try
{
b = MessageBase.Deserialize(buffer, 8, length, mustDecrypt);
}
catch { }
return b;
}
static public MessageBase Deserialize(byte[] buffer, int offset, int length, bool mustDecrypt)
{
// Create a buffer and initialize it with data from
// the input buffer offset by the specified offset amount
// and length determined by the specified length
byte[] data = new byte[length];
Buffer.BlockCopy(buffer, offset, data, 0, length);
// Array.Copy(buffer, offset, data, 0, length);
// Decrypt message
if (mustDecrypt)
data = RijndaelEncrptor.Instance.Decrypt(data);
// Deserialize the binary data into a new object of type MessageBase
using (MemoryStream stream = new MemoryStream(data))
{
using (DeflateStream ds = new DeflateStream(stream, CompressionMode.Decompress, false))
{
BinaryFormatter formatter = new BinaryFormatter();
try
{
return formatter.Deserialize(ds) as MessageBase;
}
catch
{
return null;
}
}
}
}
static public IEnumerable<MessageBase> Receive(Socket client, int bytesReceived, StateObject state)
{
// Total buffered count is the bytes received this read
// plus any unprocessed bytes from the last receive
int bufferLen = bytesReceived + state.readOffset;
// Reset the next read offset in the case
// this recieve lands on a message boundary
state.readOffset = 0;
// Make sure there are bytes to read
if (bufferLen >= 0)
{
// Initialize the current read position
int readOffset = 0;
// Process the receive buffer
while (readOffset < bufferLen)
{
// Get the message size
int length = BitConverter.ToInt32(state.Buffer, readOffset);
bool mustDecrypt = BitConverter.ToBoolean(state.Buffer, readOffset + 4);
// Increment the current read position by the length header
readOffset += 8;
// Change the buffer size if necessary
if (length + readOffset > state.Buffer.Length)
{
byte[] oldBuffer = new byte[state.BufferSize];
Buffer.BlockCopy(state.Buffer, 0, oldBuffer, 0, state.BufferSize);
// Array.Copy(state.Buffer, oldBuffer, state.BufferSize);
state.BufferSize = length + readOffset;
Buffer.BlockCopy(oldBuffer, 0, state.Buffer, 0, oldBuffer.Length);
// Array.Copy(oldBuffer, state.Buffer, oldBuffer.Length);
}
// Ensure there are enough bytes to process the message
if (readOffset + length <= bufferLen)
yield return MessageBase.Deserialize(state.Buffer, readOffset, length, mustDecrypt);
else
{
// Add back the message length
readOffset -= 8;
// Reorder the receive buffer so unprocessed
// bytes are moved to the start of the array
Buffer.BlockCopy(state.Buffer, 0, state.Buffer, 0, bufferLen - readOffset);
// Array.Copy(state.Buffer, state.Buffer, bufferLen - readOffset);
// Set the receive position to the current read position
// This is the offset where the next socket read will start
state.readOffset = bufferLen - readOffset;
break;
}
// Update the read position by the message length
readOffset += length;
}
}
}
}
上面的代码应该可以帮助您。