47

我是一位经验丰富的 C# 开发人员,但到目前为止我还没有开发过 TCP 服务器应用程序。现在我必须开发一个高度可扩展和高性能的服务器,它可以处理至少 5-10,000 个并发连接:通过 GPRS 从 GPS 设备获取原始字节数据。

一个常见的通信过程应该是这样的:

  • GPS 设备启动与我的服务器的连接
  • 如果我想获取数据,我的服务器会回答
  • 设备发送 GPS 数据
  • 我的服务器向设备发送有关获取它的报告(例如校验和)
  • 从 GPS、reportm 获取新数据,这种情况一次又一次地发生
  • 稍后 GPS DEVICE 关闭连接

所以,在我的服务器中,我需要

  • 跟踪已连接/活动的客户端
  • 从服务器端关闭任何客户端
  • 捕获事件,当设备关闭连接时
  • 获取字节数据
  • 向客户端发送数据

我开始在互联网上阅读有关此主题的信息,但这对我来说似乎是一场噩梦。有很多方法,但我找不到最好的方法。

异步套接字方法对我来说似乎是最好的,但是以这种异步风格编写代码很糟糕,而且不容易调试。

所以我的问题是:您认为在 C# 中实现高性能 TCP 服务器的最佳方式是什么?你知道有什么好的开源组件可以做到这一点吗?(我尝试了几个,但我找不到一个好的。)

4

5 回答 5

42

它必须是异步的,没有办法解决这个问题。高性能和可扩展性不能与每个套接字一个线程混在一起。您可以查看 StackExchange 自己在做什么,查看async Redis await BookSleeve,它利用了下一个 C# 版本的 CTP 功能(因此处于边缘并且可能会发生变化,但它很酷)。对于更前沿的解决方案,解决方案围绕利用SocketAsyncEventArgs 类发展,通过消除与“经典”C# 异步处理相关的异步处理程序的频繁分配,使事情更进一步:

SocketAsyncEventArgs 类是 System.Net.Sockets.Socket 类的一组增强功能的一部分,这些增强功能提供了一种可供专门的高性能套接字应用程序使用的替代异步模式。此类专为需要高性能的网络服务器应用程序而设计。应用程序可以专门或仅在目标热点区域(例如,当接收大量数据时)使用增强异步模式。

长话短说:学习异步或死去尝试......

顺便说一句,如果你问为什么异步,那么请阅读这篇文章中链接的三篇文章:高性能 Windows 程序。最终答案是:底层操作系统设计需要它。

于 2011-05-16T21:02:00.140 回答
13

正如 Remus 上面所说,您必须使用异步来保持高性能。这就是 .NET 中的 Begin.../End... 方法。

在套接字的底层,这些方法利用了 IO 完成端口,这似乎是在 Windows 操作系统上处理许多套接字的最高效的方式。

正如 Jim 所说,TcpClient 类在这里可以提供帮助并且非常易于使用。这是一个使用 TcpListener 监听传入连接并使用 TcpClient 处理它们的示例,初始 BeginAccept 和 BeginRead 调用是异步的。

此示例确实假设在套接字上使用了基于消息的协议,并且除了每次传输的前 4 个字节是长度外,该协议被省略,但随后允许您在流上使用同步读取来获取其余数据那已经被缓冲了。

这是代码:

class ClientContext
{
    public TcpClient Client;
    public Stream Stream;
    public byte[] Buffer = new byte[4];
    public MemoryStream Message = new MemoryStream();
}

class Program
{
    static void OnMessageReceived(ClientContext context)
    {
        // process the message here
    }

    static void OnClientRead(IAsyncResult ar)
    {
        ClientContext context = ar.AsyncState as ClientContext;
        if (context == null)
            return;

        try
        {
            int read = context.Stream.EndRead(ar);
            context.Message.Write(context.Buffer, 0, read);

            int length = BitConverter.ToInt32(context.Buffer, 0);
            byte[] buffer = new byte[1024];
            while (length > 0)
            {
                read = context.Stream.Read(buffer, 0, Math.Min(buffer.Length, length));
                context.Message.Write(buffer, 0, read);
                length -= read;
            }

            OnMessageReceived(context);
        }
        catch (System.Exception)
        {
            context.Client.Close();
            context.Stream.Dispose();
            context.Message.Dispose();
            context = null;
        }
        finally
        {
            if (context != null)
                context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnClientRead, context);
        }
    }

    static void OnClientAccepted(IAsyncResult ar)
    {
        TcpListener listener = ar.AsyncState as TcpListener;
        if (listener == null)
            return;

        try
        {
            ClientContext context = new ClientContext();
            context.Client = listener.EndAcceptTcpClient(ar);
            context.Stream = context.Client.GetStream();
            context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnClientRead, context);
        }
        finally
        {
            listener.BeginAcceptTcpClient(OnClientAccepted, listener);
        }
    }

    static void Main(string[] args)
    {
        TcpListener listener = new TcpListener(new IPEndPoint(IPAddress.Any, 20000));
        listener.Start();

        listener.BeginAcceptTcpClient(OnClientAccepted, listener);

        Console.Write("Press enter to exit...");
        Console.ReadLine();
        listener.Stop();
    }
}

它演示了如何处理异步调用,但需要添加错误处理以确保 TcpListener 始终接受新连接,并在客户端意外断开连接时进行更多错误处理。此外,似乎确实存在一些情况,并非所有数据都一次性到达,也需要处理。

于 2011-06-09T14:11:57.120 回答
2

您可以使用TcpClient类执行此操作,但说实话我不知道您是否可以拥有 10000 个打开的套接字。这是相当多的。但我经常TcpClient用来处理几十个并发套接字。而且异步模型实际上很好用。

你最大的问题不会是TcpClient工作。有 10,000 个并发连接,我认为带宽和可扩展性将成为问题。我什至不知道一台机器是否可以处理所有这些流量。我想这取决于数据包有多大以及它们进入的频率。但是在你承诺在一台计算机上实现这一切之前,你最好做一些粗略的估计。

于 2011-05-16T21:02:47.437 回答
2

我认为您也在寻找 UDP 技术。对于 10k 客户端,它很快,但问题是您必须为收到消息的每条消息实施确认。在 UDP 中,您不需要为每个客户端打开一个套接字,但需要在 x 秒后实现心跳/ping 机制来检查哪个客户端已连接。

于 2012-09-11T11:36:20.570 回答
1

您可以使用我制作的 TCP CSharpServer,实现起来非常简单,只需在您的一个类上实现 IClientRequest 即可。

using System;
using System.Collections.Generic;
using System.Linq;

namespace cSharpServer
{
    public interface IClientRequest
    {        
        /// <summary>
        /// this needs to be set, otherwise the server will not beable to handle the request.
        /// </summary>
        byte IdType { get; set; } // This is used for Execution.
        /// <summary>
        /// handle the process by the client.
        /// </summary>
        /// <param name="data"></param>
        /// <param name="client"></param>
        /// <returns></returns>
        byte[] Process(BinaryBuffer data, Client client);
    }
}

BinaryBuffer 让你读取发送到服务器的数据真的很容易。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;

namespace cSharpServer
{
    public class BinaryBuffer
    {
        private const string Str0001 = "You are at the End of File!";
        private const string Str0002 = "You are Not Reading from the Buffer!";
        private const string Str0003 = "You are Currenlty Writing to the Buffer!";
        private const string Str0004 = "You are Currenlty Reading from the Buffer!";
        private const string Str0005 = "You are Not Writing to the Buffer!";
        private const string Str0006 = "You are trying to Reverse Seek, Unable to add a Negative value!";
        private bool _inRead;
        private bool _inWrite;
        private List<byte> _newBytes;
        private int _pointer;
        public byte[] ByteBuffer;

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public override string ToString()
        {
            return Helper.DefaultEncoding.GetString(ByteBuffer, 0, ByteBuffer.Length);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer(string data)
            : this(Helper.DefaultEncoding.GetBytes(data))
        {
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer()
        {
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer(byte[] data)
            : this(ref data)
        {
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer(ref byte[] data)
        {
            ByteBuffer = data;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void IncrementPointer(int add)
        {
            if (add < 0)
            {
                throw new Exception(Str0006);
            }
            _pointer += add;
            if (EofBuffer())
            {
                throw new Exception(Str0001);
            }
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public int GetPointer()
        {
            return _pointer;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public static string GetString(ref byte[] buffer)
        {
            return Helper.DefaultEncoding.GetString(buffer, 0, buffer.Length);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public static string GetString(byte[] buffer)
        {
            return GetString(ref buffer);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void BeginWrite()
        {
            if (_inRead)
            {
                throw new Exception(Str0004);
            }
            _inWrite = true;

            _newBytes = new List<byte>();
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(float value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            _newBytes.AddRange(BitConverter.GetBytes(value));
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(byte value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            _newBytes.Add(value);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(int value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }

            _newBytes.AddRange(BitConverter.GetBytes(value));
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(long value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            byte[] byteArray = new byte[8];

            unsafe
            {
                fixed (byte* bytePointer = byteArray)
                {
                    *((long*)bytePointer) = value;
                }
            }

            _newBytes.AddRange(byteArray);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public int UncommitedLength()
        {
            return _newBytes == null ? 0 : _newBytes.Count;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void WriteField(string value)
        {
            Write(value.Length);
            Write(value);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(string value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            byte[] byteArray = Helper.DefaultEncoding.GetBytes(value);
            _newBytes.AddRange(byteArray);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(decimal value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            int[] intArray = decimal.GetBits(value);

            Write(intArray[0]);
            Write(intArray[1]);
            Write(intArray[2]);
            Write(intArray[3]);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void SetInt(int value, int pos)
        {
            byte[] byteInt = BitConverter.GetBytes(value);
            for (int i = 0; i < byteInt.Length; i++)
            {
                _newBytes[pos + i] = byteInt[i];
            }
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void SetLong(long value, int pos)
        {
            byte[] byteInt = BitConverter.GetBytes(value);
            for (int i = 0; i < byteInt.Length; i++)
            {
                _newBytes[pos + i] = byteInt[i];
            }
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(byte[] value)
        {
            Write(ref value);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(ref byte[] value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            _newBytes.AddRange(value);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void EndWrite()
        {
            if (ByteBuffer != null)
            {
                _newBytes.InsertRange(0, ByteBuffer);
            }
            ByteBuffer = _newBytes.ToArray();
            _newBytes = null;
            _inWrite = false;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void EndRead()
        {
            _inRead = false;
            _pointer = 0;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void BeginRead()
        {
            if (_inWrite)
            {
                throw new Exception(Str0003);
            }
            _inRead = true;
            _pointer = 0;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public byte ReadByte()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer())
            {
                throw new Exception(Str0001);
            }
            return ByteBuffer[_pointer++];
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public int ReadInt()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(4))
            {
                throw new Exception(Str0001);
            }
            int startPointer = _pointer;
            _pointer += 4;

            return BitConverter.ToInt32(ByteBuffer, startPointer);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public float[] ReadFloatArray()
        {
            float[] dataFloats = new float[ReadInt()];
            for (int i = 0; i < dataFloats.Length; i++)
            {
                dataFloats[i] = ReadFloat();
            }
            return dataFloats;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public float ReadFloat()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(sizeof(float)))
            {
                throw new Exception(Str0001);
            }
            int startPointer = _pointer;
            _pointer += sizeof(float);

            return BitConverter.ToSingle(ByteBuffer, startPointer);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public decimal ReadDecimal()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(16))
            {
                throw new Exception(Str0001);
            }
            return new decimal(new[] { ReadInt(),
                ReadInt(),
                ReadInt(),
                ReadInt()
            });
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public long ReadLong()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(8))
            {
                throw new Exception(Str0001);
            }
            int startPointer = _pointer;
            _pointer += 8;

            return BitConverter.ToInt64(ByteBuffer, startPointer);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public string ReadString(int size)
        {
            return Helper.DefaultEncoding.GetString(ReadByteArray(size), 0, size);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public byte[] ReadByteArray(int size)
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(size))
            {
                throw new Exception(Str0001);
            }
            byte[] newBuffer = new byte[size];

            Array.Copy(ByteBuffer, _pointer, newBuffer, 0, size);

            _pointer += size;

            return newBuffer;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool EofBuffer(int over = 1)
        {
            return ByteBuffer == null || ((_pointer + over) > ByteBuffer.Length);
        }
    }
}

完整项目位于 GitHub CSharpServer

于 2015-09-09T04:53:58.293 回答