0

我在使用 SocketAsyncEventArgs 接收套接字数据包排序时遇到问题。我的问题的症结在于,当客户端向服务器发送数据包时,服务器将以非标准大小的片段接收数据包,并且它们将按随机顺序进行处理。这显然意味着数据包无法被我的应用程序解码,从而搞砸了整个对话。

例如,客户端将使用 Socket.NetworkStream.Write() 方法发送一个完整的数据包:

 [-------PACKET-------]

服务器使用 SocketAsyncEventArgs 将在两个单独的数据包上接收异步回调,但将首先处理数据包的最后一个块:

 First packet:  ET-------]
 Second packet: [-----PACK--

这不会发生在所有数据包上,我无法根据数据包大小或时间准确地重现它。我实现了一个发送/确认通信协议,以便客户端不会发送另一个数据包,直到服务器确认最后一个数据包已成功接收,所以它不可能是我超载的服务器。

最令人沮丧的部分是服务器上的 Socket.Available始终为零,根据文档,这意味着没有任何内容可供读取

 If you are using a non-blocking Socket, Available is a good way to determine whether data is
 queued for reading, before calling Receive. The available data is the total amount of data
 queued in the network buffer for reading. If no data is queued in the network buffer, 
 Available returns 0.

可用为零时,SocketEventArgs.Count 似乎没有提供任何价值,并且偏移量与接收缓冲区有关,而不是基于实际数据流的位置我不确定如何将这些片段放入命令。

我猜这个问题是数据包第一部分的异步回调被第二个回调抢占了,该回调完全处理然后返回到第一部分。问题是我无法同步锁定整个回调(希望 .NET 有像 Java 这样的同步函数)。即使我这样做了,这似乎首先会否定异步回调的好处。

我做错了什么导致这些顺序错误,或者我可以做些什么来正确处理它们?

4

1 回答 1

0

从你的一些陈述中我不太确定你在做什么。你写你正在使用SocketAsyncEventArgs但正在尝试处理奇怪的 API 东西,比如.Countor .Available。如果您的套接字类型是 TCP,那么您可能做错了什么,因为数据包总是按正确的顺序排列。它们甚至可能被分割成一个字节大小的块,但顺序是正确的。这几乎就是 TCP 的全部内容。

由于您没有提供代码并且根据您的陈述,我想最好只为您提供一些SSCE来帮助您入门。

该示例使用 C#,但应适用于 VB.net。检查代码中的注释以查看从何处正确获取接收到的数据。该实现会将接收到的数据写入控制台并将其发送回客户端。Echo 服务器制作了很棒的样本!

using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace SaeaSample
{
    public class Program
    {
        static void Main()
        {
            var server = new Server(new IPEndPoint(IPAddress.Any, 12345));

            // ugly sample clients
            Parallel.For(0, 4, i =>
            {
                using (var client = new TcpClient("localhost", 12345))
                using (var stream = client.GetStream())
                using (var writer = new BinaryWriter(stream))
                using (var reader = new BinaryReader(stream))
                {
                    var text = "Hello Async-Server!";
                    var message = Encoding.UTF8.GetBytes(text);
                    Console.WriteLine("s: {0}: {1}", i, text);
                    writer.Write(message);
                    var roundtrip = reader.ReadBytes(message.Length);
                    Console.WriteLine("r: {0}: {1}", i, Encoding.UTF8.GetString(roundtrip));
                }
            });

            Console.ReadLine();
        }
    }

    public class Server
    {
        private const int readBufferSize = 8192;
        private const int sendBufferSize = readBufferSize;

        // just have a fixed number of clients instead of
        // pooling for the sake of being an example
        private const int maxClients = 4;
        private const int maxQueue = 10;

        private readonly byte[] buffer = new byte[maxClients * (readBufferSize + sendBufferSize)];
        private readonly Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

        public Server(IPEndPoint localEndPoint)
        {
            socket.Bind(localEndPoint);
            socket.Listen(maxQueue);

            for (int i = 0; i < maxClients; i++)
            {
                var client = new UserToken(i);
                client.RecvArgs.Completed += completed;
                client.SendArgs.Completed += completed;

                Console.WriteLine("accepting on client slot {0}", client.Slot);

                if (!socket.AcceptAsync(client.RecvArgs))
                {
                    completed(this, client.RecvArgs);
                }
            }
        }

        private void completed(object sender, SocketAsyncEventArgs e)
        {
            var client = (UserToken)e.UserToken;

            // socket operation had success
            if (e.SocketError == SocketError.Success)
            {
                // new client connected
                if (e.LastOperation == SocketAsyncOperation.Accept)
                {
                    onAccept(client);
                }
                // either send or received worked
                else if (e.BytesTransferred > 0)
                {
                    if (e.LastOperation == SocketAsyncOperation.Receive)
                    {
                        onReceived(client);
                    }
                    else if (e.LastOperation == SocketAsyncOperation.Send)
                    {
                        onSend(client);
                    }
                    // should never happen, handle gracefully
                    else
                    {
                        onOther(client);
                    }
                }
                // don't handle anything else
                else
                {
                    onOther(client);
                }
            }
            // socket error occured
            else
            {
                onOther(client);
            }
        }

        private void onAccept(UserToken client)
        {
            Console.WriteLine("client slot {0} connected client from {1}", client.Slot, client.RecvArgs.AcceptSocket.RemoteEndPoint);

            // once accepted, start receiving
            client.RecvArgs.SetBuffer(buffer, client.Slot * (readBufferSize + sendBufferSize), readBufferSize);

            if (!client.RecvArgs.AcceptSocket.ReceiveAsync(client.RecvArgs))
            {
                completed(this, client.RecvArgs);
            }
        }

        private void onReceived(UserToken client)
        {
            // echo whatever we got
            var builder = new StringBuilder();

            // here is the important part
            for (int i = 0; i < client.RecvArgs.BytesTransferred; i++)
            {
                // offset the buffer and echo in hex
                builder.Append(client.RecvArgs.Buffer[client.Slot * (readBufferSize + sendBufferSize) + i].ToString("x2"));
            }
            Console.WriteLine("received {0} bytes from client slot {1}: {2}", client.RecvArgs.BytesTransferred, client.Slot, builder.ToString());

            // send data back ... this is an echo server after all
            client.SendArgs.SetBuffer(client.RecvArgs.Buffer, client.Slot * (readBufferSize + sendBufferSize) + readBufferSize, client.RecvArgs.BytesTransferred);
            Buffer.BlockCopy(client.RecvArgs.Buffer, client.RecvArgs.Offset, client.SendArgs.Buffer, client.SendArgs.Offset, client.RecvArgs.BytesTransferred);
            if (!client.RecvArgs.AcceptSocket.SendAsync(client.SendArgs))
            {
                completed(this, client.SendArgs);
            }
        }

        private void onSend(UserToken client)
        {
            Console.WriteLine("sent {0} bytes back to client slot {1}", client.SendArgs.BytesTransferred, client.Slot);

            // start receiving again
            if (!client.RecvArgs.AcceptSocket.ReceiveAsync(client.RecvArgs))
            {
                completed(this, client.RecvArgs);
            }
        }

        private void onOther(UserToken client)
        {
            Console.WriteLine("disconnecting client slot {0}", client.Slot);

            // just close the connection and accept again
            client.RecvArgs.SetBuffer(null, 0, 0);
            if (client.RecvArgs.AcceptSocket != null) {
                client.RecvArgs.AcceptSocket.Dispose();
                client.RecvArgs.AcceptSocket = null;
            }

            Console.WriteLine("accepting on client slot {0}", client.Slot);

            if (!socket.AcceptAsync(client.RecvArgs))
            {
                completed(this, client.RecvArgs);
            }
        }
    }

    public class UserToken
    {
        public readonly int Slot;
        public readonly SocketAsyncEventArgs RecvArgs = new SocketAsyncEventArgs();
        public readonly SocketAsyncEventArgs SendArgs = new SocketAsyncEventArgs();

        public UserToken(int slot)
        {
            Slot = slot;
            RecvArgs.UserToken = this;
            SendArgs.UserToken = this;
        }
    }
}

另请注意,由于此代码是异步的,因此控制台输出可能始终按顺序排列,也可能不按顺序排列。您可以将读取和写入缓冲区大小常量从 8192 一直减小到 1。数据包将在两个方向上逐字节发送,但肯定会按顺序发送。

对于一些更深入的解释,MSDN始终是一个很好的起点。

于 2015-03-20T07:22:35.507 回答