3

我在SocketAsyncEventArgs 缓冲区中描述了一个类似的问题,其中我的 SocketAsyncEventArgs UDP 服务器实现接收具有以下内容的数据包:

  1. SocketAsyncEventArgs.BytesTransferred > 0
  2. 里面没有数据SocketAsyncEventArgs.Buffer

这只会在负载下偶尔发生,并且可以在 3 台单独的机器上重现。我想这也是导致我的其他未解决的 SO 问题(取消挂钩事件时检测到 FatalExecutionEngineError)出现问题的原因。

到目前为止,这里是 UDP Server 的实现:

/// <summary>
/// Provides a basic implementation of a UDPSocket based on ISocket
/// </summary>
public class UDPSocket : ISocket
{
    #region "Private Variables"
    private Socket _socket;
    private IBufferManagerProvider _bufferManager;
    #endregion

    #region "Public Properties"
    public Int32 Port { get; private set; }
    public Int32 MessagePrefixLength { get; set; }
    public IPAddress ListeningAddress { get; private set; }
    public ILogProvider LogProvider { get; private set; }
    public bool AllowAddressReuse { get; private set; }
    #endregion

    #region "Constructors"
    private UDPSocket() { }
    public UDPSocket(String listeningAddress) : this(listeningAddress, 4444, null, null) { }
    public UDPSocket(String listeningAddress, Int32 port) : this(listeningAddress, port, null, null) { }
    public UDPSocket(Int32 port) : this("0.0.0.0", port, null, null) { }

    public UDPSocket(String listeningAddress, Int32 port, IBufferManagerProvider manager, ILogProvider logger)
    {
        // Setup the port
        if (port <= 0)
        {
            throw new ArgumentException("Port number cannot be less than 0.");
        }
        else
        {
            this.Port = port;
        }

        // check the ip address
        if (String.IsNullOrEmpty(listeningAddress))
        {
            throw new Exception("The listening address supplied is not valid.");
        }
        this.ListeningAddress = IPAddress.Parse(listeningAddress);

        // check the interfaces
        this.LogProvider = (logger == null) ? new DefaultLogProvider(LogLevel.None) : logger;
        _bufferManager = (manager == null) ? new DefaultBufferManager(100, 2048, null, null) : manager;

        // use a default message prefix
        this.MessagePrefixLength = 4;
    }
    #endregion

    #region "Event Handlers"
    #endregion

    #region "Internal handler methods"
    private void Receive()
    {
        SocketAsyncEventArgs args = _bufferManager.TakeNextSocketAsyncEventArgs();
        byte[] buff = _bufferManager.TakeNextBuffer();
        args.SetBuffer(buff, 0, buff.Length);
        args.Completed += PacketReceived;
        args.RemoteEndPoint = new IPEndPoint(IPAddress.Any, this.Port);

        try
        {
            if (!_socket.ReceiveMessageFromAsync(args))
            {
                OnPacketReceived(args);
            }
        }
        catch
        {
            // we should only jump here when we disconnect all the clients.
        }
    }

    private void PacketReceived(object sender, SocketAsyncEventArgs e)
    {
        OnPacketReceived(e);
    }

    private void OnPacketReceived(SocketAsyncEventArgs e)
    {
        // Start a new Receive operation straight away
        Receive();

        // Now process the packet that we have already
        if (e.BytesTransferred <= MessagePrefixLength)
        {
            // Error condition, empty packet
            this.LogProvider.Log(String.Format("Empty packet received from {0}. Discarding packet.", e.ReceiveMessageFromPacketInfo.Address.ToString()), "UDPSocket.OnPacketReceived", LogLevel.Minimal);
            ReleaseArgs(e);
            return;
        }

        //
        //  The buffer can occassionally be zero'd at this point in time
        //

        // Get the message length from the beginning of the packet.
        byte[] arrPrefix = new byte[MessagePrefixLength];
        Buffer.BlockCopy(e.Buffer, 0, arrPrefix, 0, MessagePrefixLength);
        Int32 messageLength = BitConverter.ToInt32(arrPrefix, 0);

        // the number of bytes remaining to store
        Int32 bytesToProcess = e.BytesTransferred - MessagePrefixLength;

        if (bytesToProcess < messageLength)
        {
            this.LogProvider.Log(String.Format("Missing data from {0}. Discarding packet.", e.ReceiveMessageFromPacketInfo.Address.ToString()), "UDPSocket.OnPacketReceived", LogLevel.Minimal);
            ReleaseArgs(e);
            return;
        }

        // Create a data buffer
        byte[] data = new byte[messageLength];

        // Copy the remaining data to the data buffer on the user token
        Buffer.BlockCopy(e.Buffer, MessagePrefixLength, data, 0, messageLength);

        // Data is safely stored, so unhook the event and return the SocketAsyncEventArgs back to the pool
        ReleaseArgs(e);

        // Thread safe way of triggering the event
        var evnt = OnDataReceived;

        if (evnt != null)
        {
            evnt(e, new SocketDataEventArgs(data));
        }
    }

    private void ReleaseArgs(SocketAsyncEventArgs e)
    {
        e.Completed -= PacketReceived;
        _bufferManager.InsertSocketAsyncEventArgs(e);
        _bufferManager.InsertBuffer(e.Buffer);
    }
    #endregion

    #region "ISocket implicit implementation"
    public void Start()
    {
        this.LogProvider.Log("Starting. Creating socket", "UDPSocket.Start", LogLevel.Verbose);
        _socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
        _socket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.PacketInformation, true);
        _socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, this.AllowAddressReuse);
        _socket.Bind(new IPEndPoint(this.ListeningAddress, this.Port));

        // use a default message prefix
        this.MessagePrefixLength = 4;

        // begin receiving packets
        Receive();

        this.LogProvider.Log("Socket created. Listening for packets", "UDPSocket.Start", LogLevel.Verbose);
    }

    public void Stop()
    {
        // do a shutdown before you close the socket
        try
        {
            _socket.Shutdown(SocketShutdown.Both);
            this.LogProvider.Log("Clean socket shutdown", "TCPSocket.CloseSocket", LogLevel.Verbose);
        }
        // throws if socket was already closed
        catch (Exception ex)
        {
            this.LogProvider.Log("Error closing socket - " + ex.Message, "TCPSocket.CloseSocket", LogLevel.Minimal);
        }

        // Close the socket, which calls Dispose internally
        _socket.Close();
        this.LogProvider.Log("Socket closed", "TCPSocket.CloseSocket", LogLevel.Verbose);
    }

    public event EventHandler<SocketDataEventArgs> OnDataReceived;
    #endregion
}

如果需要,我可以制作完整的服务器/客户端演示,并发布其他类,例如缓冲区管理器。这是我第一次使用 SocketAsyncEventArgs,所以我的实现可能不是 100% 正确

4

1 回答 1

0

我想我找到了问题所在。Stack<T>我没有使用 a 来管理缓冲区,而是使用 a来Queue<T>大大减少问题。对于那些感兴趣的人,这是我对缓冲区队列的实现:

/// <summary>
/// Creates a managed Queue that makes a program wait when the resources are depleated
/// </summary>
/// <typeparam name="T">The type of Queue to create</typeparam>
public sealed class ManagedQueue<T>
{
    #region "Constructors"
    public ManagedQueue(Int32 capacity = 400, bool fillQueue = false, Queue<T> queue = null)
    {
        Capacity = capacity;
        _queue = queue ?? new Queue<T>(capacity);
        _restrictor = new SemaphoreSlim((fillQueue) ? Capacity : 0, Capacity);

        if (fillQueue)
        {
            // Setup the queue with default values
            for (Int32 i = 0; i < Capacity; i++)
            {
                Insert(default(T));
            }
        }
    }
    #endregion

    /// <summary>
    /// Gets the defined over all queue Capacity
    /// </summary>
    public Int32 Capacity { get; private set; }

    // The queue to hold the items
    private readonly Queue<T> _queue;

    // The SemaphoreSlim to restrict access to the queue
    private readonly SemaphoreSlim _restrictor;

    /// <summary>
    /// Take the next resource available from the queue. This is a blocking operation if capacity is reached.
    /// </summary>
    /// <returns>The next resource available</returns>
    public T TakeNext()
    {
        // Sanity Check
        if (_queue == null)
        {
            throw new InvalidOperationException("The queue cannot be null");
        }

        // make us wait if necessary
        _restrictor.Wait();

        lock (_queue)
        {
            if (_queue.Count > 0)
            {
                return _queue.Dequeue();
            }
            throw new Exception("There has been a Semaphore/queue offset");
        }
    }

    /// <summary>
    /// Adds an item to the queue. This will release other threads if they are blocked
    /// </summary>
    /// <param name="item"></param>
    public void Insert(T item)
    {
        // Sanity Check
        if (_queue == null)
        {
            throw new InvalidOperationException("The queue cannot be null");
        }

        // Sanity Check
        if (item == null)
        {
            throw new ArgumentException("The item cannot be null");
        }

        lock (_queue)
        {
            _queue.Enqueue(item);
            _restrictor.Release();
        }
    }
}
于 2013-03-07T11:43:35.090 回答