9

我一直在到处寻找有关如何处理 TCP 消息帧的示例。我看到很多例子,其中 NetworkStreams 被传递到 StreamReader 或 StreamWriter 对象,然后使用 ReadLine 或 WriteLine 方法处理 '\n' 分隔的消息。我的应用程序协议包含以 '\n' 结尾的消息,因此 NetworkStream 似乎是要走的路。但是,我找不到任何关于结合异步套接字处理所有这些的正确方法的具体示例。下面调用 ReceiveCallback() 时,如何实现 NetworkStream 和 StreamReader 类来处理消息帧?根据我所阅读的内容,我可能会在一次接收中获得一条消息的一部分,而在下一次接收中获得消息的其余部分(包括'\n')。这是否意味着我可以获得一条消息的结尾和下一条消息的一部分?当然,必须有一种更简单的方法来处理这个问题。

我有以下代码:

    private void StartRead(Socket socket)
    {
        try
        {
            StateObject state = new StateObject();
            state.AsyncSocket = socket;

            socket.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state);
        }
        catch (SocketException)
        {
            m_Socket.Shutdown(SocketShutdown.Both);
            Disconnect();
        }
    }

    private void ReceiveCallback(IAsyncResult ar)
    {
        try
        {
            StateObject state = (StateObject)ar.AsyncState;

            int bytes_read = state.AsyncSocket.EndReceive(ar);

            char[] chars = new char[bytes_read + 1];
            System.Text.Decoder decoder = System.Text.Encoding.UTF8.GetDecoder();
            int charLength = decoder.GetChars(state.Buffer, 0, bytes_read, chars, 0);

            String data = new String(chars);

            ParseMessage(data);

            StartRead(state.AsyncSocket);
        }
        catch (SocketException)
        {
            m_Socket.Shutdown(SocketShutdown.Both);
            Disconnect();
        }
    }
4

3 回答 3

3

给块加上一个长度前缀比使用分隔符更好。您无需处理任何类型的转义即可以这种方式使用换行符发送数据。

这个答案现在可能与您无关,因为它使用了AsyncCTP中的功能,这些功能只会出现在 .net 的下一版本中。但是,它确实使事情变得更加简洁。本质上,您编写的代码正是您为同步情况执行的代码,但在有异步调用的地方插入“等待”语句。

    public static async Task<Byte[]> ReadChunkAsync(this Stream me) {
        var size = BitConverter.ToUInt32(await me.ReadExactAsync(4), 0);
        checked {
            return await me.ReadExactAsync((int)size);
        }
    }

    public static async Task<Byte[]> ReadExactAsync(this Stream me, int count) {
        var buf = new byte[count];
        var t = 0;
        while (t < count) {
            var n = await me.ReadAsync(buf, t, count - t);
            if (n <= 0) {
                if (t > 0) throw new IOException("End of stream (fragmented)");
                throw new IOException("End of stream");
            }
            t += n;
        }
        return buf;
    }

    public static void WriteChunk(this Stream me, byte[] buffer, int offset, int count) {
        me.Write(BitConverter.GetBytes(count), 0, 4);
        me.Write(buffer, offset, count);
    }
于 2011-05-19T15:04:36.363 回答
1

基本上,您创建一个缓冲区,每次接收数据时,将该数据添加到缓冲区并确定您是否已经收到一条或多条完整消息。

在这之间ReceiveCallbackStartRead您将不会收到任何异步消息(传入的数据将自动在套接字级别缓冲),因此它是检查完整消息并将其从缓冲区中删除的理想场所。

所有的变化都是可能的,包括接收消息 1 的结尾,加上消息 2,加上消息 3 的开头,都在一个块中。

我不建议对块进行 UTF8 解码,因为一个 UTF8 字符可能包含两个字节,如果它们在块之间分割,则您的数据可能会损坏。在这种情况下,您可以保留一个 byte[]-buffer ( MemoryStream?) 并在 0x0A 字节上拆分消息。

于 2011-05-19T12:46:25.323 回答
0

好的,这就是我最终要做的。我创建了一个读取器线程,它根据网络流创建一个 NetworkStream 和一个 StreamReader。然后我使用 StreamReader.ReadLine 以这种方式读取行。这是一个同步调用,但它在自己的线程中。它似乎工作得更好。我必须实现它,因为这是我们的应用程序协议(换行符分隔的消息)。我知道其他人会像我所做的那样四处寻找答案,这是我的 Client 类中的相关读取代码:

public class Client
{
    Socket              m_Socket;

    EventWaitHandle     m_WaitHandle;
    readonly object     m_Locker;
    Queue<IEvent>       m_Tasks;
    Thread              m_Thread;

    Thread              m_ReadThread;

    public Client()
    {
        m_WaitHandle = new AutoResetEvent(false);
        m_Locker = new object();
        m_Tasks = new Queue<IEvent>();

        m_Thread = new Thread(Run);
        m_Thread.IsBackground = true;
        m_Thread.Start();
    }

    public void EnqueueTask(IEvent task)
    {
        lock (m_Locker)
        {
            m_Tasks.Enqueue(task);
        }

        m_WaitHandle.Set();
    }

    private void Run()
    {
        while (true)
        {
            IEvent task = null;

            lock (m_Locker)
            {
                if (m_Tasks.Count > 0)
                {
                    task = m_Tasks.Dequeue();

                    if (task == null)
                    {
                        return;
                    }
                }
            }

            if (task != null)
            {
                task.DoTask(this);
            }
            else
            {
                m_WaitHandle.WaitOne();
            }
        }
    }

    public void Connect(string hostname, int port)
    {
        try
        {
            m_Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

            IPAddress[] IPs = Dns.GetHostAddresses(hostname);

            m_Socket.BeginConnect(IPs, port, new AsyncCallback(ConnectCallback), m_Socket);
        }
        catch (SocketException)
        {
            m_Socket.Close();
            OnConnect(false, "Unable to connect to server.");
        }
    }

    private void ConnectCallback(IAsyncResult ar)
    {
        try
        {
            Socket socket = (Socket)ar.AsyncState;

            socket.EndConnect(ar);

            OnConnect(true, "Successfully connected to server.");

            m_ReadThread = new Thread(new ThreadStart(this.ReadThread));
            m_ReadThread.Name = "Read Thread";
            m_ReadThread.IsBackground = true;
            m_ReadThread.Start();
        }
        catch (SocketException)
        {
            m_Socket.Close();
            OnConnect(false, "Unable to connect to server.");
        }
    }

    void ReadThread()
    {
        NetworkStream networkStream = new NetworkStream(m_Socket);
        StreamReader reader = new StreamReader(networkStream);

        while (true)
        {
            try
            {
                String message = reader.ReadLine();

                // To keep the code thread-safe, enqueue a task in the CLient class thread to parse the message received.
                EnqueueTask(new ServerMessageEvent(message));
            }
            catch (IOException)
            {
                // The code will reach here if the server disconnects from the client. Make sure to cleanly shutdown...
                Disconnect();
                break;
            }
        }
    }

    ... Code for sending/parsing the message in the Client class thread.
}
于 2011-05-19T22:47:32.593 回答