2

我看不出池化的 SocketAsyncEventArgs样式如何帮助我减少服务于许多并发连接的服务器的内存消耗。

是的,它提供了 MS 的 Begin/End 样式的替代方案,上述 MSDN 页面描述为需要a System.IAsyncResult object be allocated for each asynchronous socket operation.

最初的研究让我相信,出于某种原因,它最多只允许我分配少数字节数组,并在我的数千个并发连接的客户端之间共享它们。

但似乎如果我想在数千个客户端连接上等待数据,我必须调用ReceiveAsync数千次,每次都提供一个不同的字节数组(包装在 SocketAsyncEventArgs 中),然后这数千个数组将坐在那里直到客户端决定发送的时间,很可能是 10 秒。

因此,除非我在客户端发送数据时调用 ReceiveAsync(或者在那之后,依赖于一些网络堆栈缓冲区?)——这是由客户端自行决定且服务器无法预测的,否则我很不走运,字节数组将坐在那里,无所事事地等待客户动他的屁股。

我希望用一个字节数组(或者每个侦听线程一个数组,如果并行化有意义的话)来监听那些数千个连接,并且一旦这些连接中的任何一个发送了一些东西(这确实必须进入一些网络堆栈缓冲区首先无论如何),它将被复制到该数组中,我的侦听器被调用,一旦侦听器完成,该数组就可以被重用。

Socket.*Async() 方法确实不可能吗?

.net 的套接字库有可能实现这样的事情吗?

4

3 回答 3

1

多个套接字操作不能共享相同的内存(或者如果您收到未定义的结果)。

您可以通过首先读取 1 个字节来规避此问题。当读取完成时,可能会有更多数据出现。因此,对于下一次读取,您使用更有效的大小,例如 4KB(或者您询问DataAvailable属性 - 这大约是该属性的唯一有效用例)。

于 2015-04-08T13:03:30.350 回答
0

MSDN 文章解释了池的工作原理。本质上:

a) 如果有可用的池实例,则使用它,否则创建一个新实例。

b) 完成后将实例返回到池中,以便重复使用。

最终,池大小将增长以适应所有请求,或者您可以将池配置为具有最大实例计数,并在有实例请求时阻塞,达到最大池大小,并且池当前为空。此策略可防止池以不受控制的方式增长。

于 2015-04-08T13:07:44.367 回答
0

这是一个实现的草图,它结合了 usr 的出色byte[1]解决方法建议,并展示了如何在不牺牲性能的情况下将有些繁琐的Socket.xxxAsync方法完全隐藏在 a中。SimpleAsyncSocket

一个简单的异步回显服务器使用SimpleAsyncSocket可能如下所示。

readonly static Encoding Enc = new UTF8Encoding(false);
SimpleAsyncSocket _simpleSocket;

void StartEchoServer(Socket socket)
{
    _simpleSocket = new SimpleAsyncSocket(socket, OnSendCallback,
        _receiveBufferPool, OnReceiveCallback);
}

bool OnReceiveCallback(SimpleAsyncSocket socket,
    ArraySegment<byte> bytes)
{
    var str = Enc.GetString(bytes.Array, bytes.Offset, bytes.Count);
    _simpleSocket.SendAsync(new ArraySegment<byte>(Enc.GetBytes(str)));
    return false;
}

void OnSendCallback(SimpleAsyncSocket asyncSocket,
    ICollection<ArraySegment<byte>> collection, SocketError arg3)
{
    var bytes = collection.First();
    var str = Enc.GetString(bytes.Array, bytes.Offset, bytes.Count);
}

这是实现的草图:

class SimpleAsyncSocket
{
    private readonly Socket _socket;
    private readonly Pool<byte[]> _receiveBufferPool;
    private readonly SocketAsyncEventArgs _recvAsyncEventArgs;
    private readonly SocketAsyncEventArgs _sendAsyncEventArgs;
    private readonly byte[] _waitForReceiveEventBuffer = new byte[1];
    private readonly Queue<ArraySegment<byte>> _sendBuffers = new Queue<ArraySegment<byte>>();

    public SimpleAsyncSocket(Socket socket, Action<SimpleAsyncSocket, ICollection<ArraySegment<byte>>, SocketError> sendCallback,
        Pool<byte[]> receiveBufferPool, Func<SimpleAsyncSocket, ArraySegment<byte>, bool> receiveCallback)
    {
        if (socket == null) throw new ArgumentNullException("socket");
        if (sendCallback == null) throw new ArgumentNullException("sendCallback");
        if (receiveBufferPool == null) throw new ArgumentNullException("receiveBufferPool");
        if (receiveCallback == null) throw new ArgumentNullException("receiveCallback");

        _socket = socket;

        _sendAsyncEventArgs = new SocketAsyncEventArgs();
        _sendAsyncEventArgs.UserToken = sendCallback;
        _sendAsyncEventArgs.Completed += SendCompleted;

        _receiveBufferPool = receiveBufferPool;
        _recvAsyncEventArgs = new SocketAsyncEventArgs();
        _recvAsyncEventArgs.UserToken = receiveCallback;
        _recvAsyncEventArgs.Completed += ReceiveCompleted;
        _recvAsyncEventArgs.SetBuffer(_waitForReceiveEventBuffer, 0, 1);
        ReceiveAsyncWithoutTheHassle(_recvAsyncEventArgs);
    }

    public void SendAsync(ArraySegment<byte> buffer)
    {
        lock (_sendBuffers)
            _sendBuffers.Enqueue(buffer);
        StartOrContinueSending();
    }
    private void StartOrContinueSending(bool calledFromCompleted = false)
    {
        lock (_waitForReceiveEventBuffer) // reuse unrelated object for locking
        {
            if (!calledFromCompleted && _sendAsyncEventArgs.BufferList != null)
                return; // still sending
            List<ArraySegment<byte>> buffers = null;
            lock (_sendBuffers)
            {
                if (_sendBuffers.Count > 0)
                {
                    buffers = new List<ArraySegment<byte>>(_sendBuffers);
                    _sendBuffers.Clear();
                }
            }
            _sendAsyncEventArgs.BufferList = buffers; // nothing left to send
            if (buffers == null)
                return;
        }

        if (!_socket.SendAsync(_sendAsyncEventArgs))
            // Someone on stackoverflow claimed that invoking the Completed
            // handler synchronously might end up blowing the stack, which
            // does sound possible. To avoid that guy finding my code and
            // downvoting me for it (and maybe just because it's the right
            // thing to do), let's leave the call stack via the ThreadPool
            ThreadPool.QueueUserWorkItem(state => SendCompleted(this, _sendAsyncEventArgs));
    }
    private void SendCompleted(object sender, SocketAsyncEventArgs args)
    {
        switch (args.LastOperation)
        {
            case SocketAsyncOperation.Send:
                {
                    try
                    {
                        var bytesTransferred = args.BytesTransferred;
                        var sendCallback = (Action<SimpleAsyncSocket, ICollection<ArraySegment<byte>>, SocketError>)args.UserToken;
                        // for the moment, I believe the following commented-out lock is not
                        // necessary, but still have to think it through properly
                        // lock (_waitForReceiveEventBuffer) // reuse unrelated object for locking
                        {
                            sendCallback(this, args.BufferList, args.SocketError);
                        }
                        StartOrContinueSending(true);
                    }
                    catch (Exception e)
                    {
                        args.BufferList = null;
                        // todo: log and disconnect
                    }


                    break;
                }
            case SocketAsyncOperation.None:
                break;
            default:
                throw new Exception("Unsupported operation: " + args.LastOperation);
        }
    }
    private void ReceiveCompleted(object sender, SocketAsyncEventArgs args)
    {
        switch (args.LastOperation)
        {
            case SocketAsyncOperation.Receive:
                {
                    var bytesTransferred = args.BytesTransferred;
                    var buffer = args.Buffer;
                    if (args.BytesTransferred == 0) // remote end closed connection
                    {
                        args.SetBuffer(null, 0, 0);
                        if (buffer != _waitForReceiveEventBuffer)
                            _receiveBufferPool.Return(buffer);

                        // todo: disconnect event
                        return;
                    }
                    if (buffer == _waitForReceiveEventBuffer)
                    {
                        if (args.BytesTransferred == 1)
                        {
                            // we received one byte, there's probably more!
                            var biggerBuffer = _receiveBufferPool.Take();
                            biggerBuffer[0] = _waitForReceiveEventBuffer[0];
                            args.SetBuffer(biggerBuffer, 1, biggerBuffer.Length - 1);
                            ReceiveAsyncWithoutTheHassle(args);
                        }
                        else
                            throw new Exception("What the heck");
                    }
                    else
                    {
                        var callback = (Func<SimpleAsyncSocket, ArraySegment<byte>, bool>)args.UserToken;
                        bool calleeExpectsMoreDataImmediately = false;
                        bool continueReceiving = false;
                        try
                        {
                            var count = args.Offset == 1
                                            // we set the first byte manually from _waitForReceiveEventBuffer
                                            ? bytesTransferred + 1
                                            : bytesTransferred;
                            calleeExpectsMoreDataImmediately = callback(this, new ArraySegment<byte>(buffer, 0, count));
                            continueReceiving = true;
                        }
                        catch (Exception e)
                        {
                            // todo: log and disconnect
                        }
                        finally
                        {
                            if (!calleeExpectsMoreDataImmediately)
                            {
                                args.SetBuffer(_waitForReceiveEventBuffer, 0, 1);
                                _receiveBufferPool.Return(buffer);
                            }
                        }
                        if (continueReceiving)
                            ReceiveAsyncWithoutTheHassle(args);
                    }
                    break;
                }
            case SocketAsyncOperation.None:
                break;
            default:
                throw new Exception("Unsupported operation: " + args.LastOperation);
        }
    }

    private void ReceiveAsyncWithoutTheHassle(SocketAsyncEventArgs args)
    {
        if (!_socket.ReceiveAsync(args))
            // Someone on stackoverflow claimed that invoking the Completed
            // handler synchronously might end up blowing the stack, which
            // does sound possible. To avoid that guy finding my code and
            // downvoting me for it (and maybe just because it's the right
            // thing to do), let's leave the call stack via the ThreadPool
            ThreadPool.QueueUserWorkItem(state => ReceiveCompleted(this, args));
    }
}
于 2015-04-09T04:23:56.023 回答