2

我最近遇到了一个问题,尝试使用 SendAsync 方法广播到打开的 WebSocket,收到 InvalidOperationException 消息“发送操作已经在进行中”

挖掘 WebSocket 类的源代码,内部会跟踪忙碌状态:

internal ChannelState _sendState;

// Represents the state of a single channel; send and receive have their own states
internal enum ChannelState {
    Ready, // this channel is available for transmitting new frames
    Busy, // the channel is already busy transmitting frames
    Closed // this channel has been closed
}

理想情况下,如果我要广播对 WebSocket 连接的更新,我想提前知道它很忙并执行一些其他处理(例如排队消息)。

这个状态被标记为内部状态似乎很奇怪——如果它是一个公共财产,我可以简单地检查一下

context.WebSocket.SendState == ChannelState.Ready

WebSocket 上的 SendAsync 的正确模式是什么,可以防止引发此异常?

我不愿意通过反射来破解对该属性的访问。

编辑以澄清:

WebSocket.State 属性无助于这种情况。该属性使用此枚举:

public enum WebSocketState
{
    None,
    Connecting,
    Open,
    CloseSent,
    CloseReceived,
    Closed,
    Aborted
}

一旦打开套接字连接,无论它是否忙于发送,此语句都将评估为“true”:

context.WebSocket.State == WebSocketState.Open
4

1 回答 1

1

我已经实施了一个似乎可以满足我需求的解决方案。

基本问题是当两个线程试图在同一个 WebSocket 上发送时。

我的解决方案有几个部分,并且取决于它在 AspNetWebSocketContect 下运行的事实,因此我可以使用“Items”字典来存储有关当前连接的属性。

  1. “发送”属性用于跟踪 WebSocket 是否忙。
  2. “队列”属性是要发送的 ArraySegments 列表。
  3. 锁定 WebSocket 对象并同步运行 Send 方法。
  4. 发送完成后处理队列中的任何项目。
  5. 在方法开始时屈服以防止阻塞。

这是我目前在开发环境中使用的代码——我将监视它以查看它的扩展程度:

/// <summary>
/// Send a message to a specific client.
/// </summary>
/// <param name="context"></param>
/// <param name="buffer"></param>
/// <returns></returns>
private static async Task SendMessage(AspNetWebSocketContext context, ArraySegment<byte> buffer)
{
    // Return control to the calling method immediately.
    await Task.Yield();

    // Make sure we have data.
    if (buffer.Count == 0)
        return;

    // The state of the connection is contained in the context Items dictionary.
    bool sending;
    lock (context)
    {
        // Are we already in the middle of a send?
        bool.TryParse(context.Items["sending"]?.ToString(), out sending);

        // If not, we are now.
        if (!sending)
            context.Items["sending"] = true;
    }

    if (!sending)
    {
        // Lock with a timeout, just in case.
        if (!Monitor.TryEnter(context.WebSocket, 1000))
        {
            // If we couldn't obtain exclusive access to the socket in one second, something is wrong.
            await context.WebSocket.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Empty, CancellationToken.None);
            return;
        }

        try
        {
            // Send the message synchronously.
            var t = context.WebSocket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None);
            t.Wait();
        }
        finally
        {
            Monitor.Exit(context.WebSocket);
        }

        // Note that we've finished sending.
        lock (context)
        {
            context.Items["sending"] = false;
        }

        // Handle any queued messages.
        await HandleQueue(context);
    }
    else
    {
        // Add the message to the queue.
        lock (context)
        {
            var queue = context.Items["queue"] as List<ArraySegment<byte>>;
            if (queue == null)
                context.Items["queue"] = queue = new List<ArraySegment<byte>>();
            queue.Add(buffer);
        }
    }
}

/// <summary>
/// If there was a message in the queue for this particular web socket connection, send it.
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
private static async Task HandleQueue(AspNetWebSocketContext context)
{
    var buffer = new ArraySegment<byte>();
    lock (context)
    {
        // Check for an item in the queue.
        var queue = context.Items["queue"] as List<ArraySegment<byte>>;
        if (queue != null && queue.Count > 0)
        {
            // Pull it off the top.
            buffer = queue[0];
            queue.RemoveAt(0);
        }
    }

    // Send that message.
    if (buffer.Count > 0)
        await SendMessage(context, buffer);
}

我对这种方法的一些考虑:

  1. 我相信最好锁定一个简单的对象而不是像我上面所做的那样更复杂的对象。我不确定使用“context”和“context.WebSocket”对象进行锁定的后果是什么。
  2. 理论上我不需要锁定 WebSocket,因为我已经在测试“发送”属性。但是,测试导致 WebSocket 在几秒钟的重负载下变得无响应。一旦我实施了锁定模式,这种情况就消失了。
  3. 我通过同一个 WebSocket 连接对 10 个并发线程(每个发送 1000 条消息)进行了测试。每条消息都有编号,因此我可以将它们记录到客户那里,并且每条消息都通过了。所以队列系统似乎起作用了。
于 2016-04-28T03:47:20.320 回答