0

我都赞成多线程和异步任务,但放开控制不是我的一杯茶。

我正在做一个项目,其中一个进程通过 websockets 与多个目标进行通信,并且发送显然应该尽可能快,但必须相互同步。考虑将视频流同时传输到多个设备。因此,我不想在目标接收到第一帧之前继续发送下一帧。

使用.net WebSockets时,我已经尝试过使用HttpListner和Kestrel实现(我猜它或多或少相同),然后socket.SendAsync似乎在发送之前接收数据并在本地缓冲它,返回TaskCompleted 在实际发送之前完成。这意味着发送任务基本上是瞬间完成的。

我当然“可以”实现一个 ACK​​ 模式,接收者在收到整个数据包时发送一个 ACK​​,但这本身会减慢进程,我想避免这种情况,因为系统本身已经知道这一点,因为它确实会尝试连续发送排队的包裹。

这基本上是负责发送数据的代码,以 2048b 为单位:

Log.Info($"Send {sendBytes.Length}b");
if (socket!.State == WebSocketState.Open)
{
    await socket.SendAsync(
        sendBuffer,
        WebSocketMessageType.Binary,
        endOfMessage: true,
        cancellationToken: cts
        );
}
Log.Info($"Done "); 

我会假设这里的 await 实际上会等待传输完成,这是不做的,但它似乎在等待任务完成。

在接收方,这是处理数据,前后有调试响应:

wsClientSession.send("REC " + (String)payloadLength + "b");
ProcessFrame(payload, payloadLength);
wsClientSession.send("ACK " + (String)payloadLength + "b");

发送方的日志显示如下:

  • 线程 28 是发送线程,它完成得非常快。它需要发送 6198 个字节并将其分成 4 次传输。这显然是在第一个数据包离开服务器之前完成的。
  • 其余线程是调试响应的异步接收者。
  • (大小差异是由于数据的加密)
2022-02-07 16:03:34.8887 threadid:28 # Info # Send 6198b to 5d8263c6-909c-47a1-b4d0-57d74d6a4665
2022-02-07 16:03:34.8887 threadid:28 # Info # Send 2048b
2022-02-07 16:03:34.8887 threadid:28 # Info # Done
2022-02-07 16:03:34.8887 threadid:28 # Info # Send 2048b
2022-02-07 16:03:34.8887 threadid:28 # Info # Done
2022-02-07 16:03:34.8887 threadid:28 # Info # Send 2048b
2022-02-07 16:03:34.8887 threadid:28 # Info # Done
2022-02-07 16:03:34.8887 threadid:28 # Info # Send 128b
2022-02-07 16:03:34.8887 threadid:28 # Info # Done
2022-02-07 16:03:34.8887 threadid:28 # Info # Send done to 5d8263c6-909c-47a1-b4d0-57d74d6a4665
2022-02-07 16:03:35.0024 threadid:7 # Info # 5d8263c6-909c-47a1-b4d0-57d74d6a4665 - REC 2030b
2022-02-07 16:03:35.0090 threadid:7 # Info # 5d8263c6-909c-47a1-b4d0-57d74d6a4665 - ACK 2030b
2022-02-07 16:03:35.0090 threadid:17 # Info # 5d8263c6-909c-47a1-b4d0-57d74d6a4665 - REC 2030b
2022-02-07 16:03:35.0260 threadid:33 # Info # 5d8263c6-909c-47a1-b4d0-57d74d6a4665 - ACK 2030b
2022-02-07 16:03:35.0260 threadid:33 # Info # 5d8263c6-909c-47a1-b4d0-57d74d6a4665 - REC 2030b
2022-02-07 16:03:35.0260 threadid:30 # Info # 5d8263c6-909c-47a1-b4d0-57d74d6a4665 - ACK 2030b
2022-02-07 16:03:35.0373 threadid:17 # Info # 5d8263c6-909c-47a1-b4d0-57d74d6a4665 - REC 108b
2022-02-07 16:03:35.0373 threadid:4 # Info # 5d8263c6-909c-47a1-b4d0-57d74d6a4665 - ACK 108b

现在,我对这个 WebSocket 实现的理解全错了吗?是否不可能对数据发送的完整性进行回调/事件或类似的操作?我可以通过其他方式实现这一目标吗?同步执行这个真正的同步将是一个解决方案,但我似乎无法强制这样做。

啊!

4

1 回答 1

1

我正在做一个项目,其中一个进程通过 websockets 与多个目标进行通信,并且发送显然应该尽可能快,但必须相互同步。考虑将视频流同时传输到多个设备。因此,我不想在目标接收到第一帧之前继续发送下一帧。

嗯……实际上……如今,视频和音频传输都倾向于使用 UDP,并且故意设计为允许丢弃的数据包和帧。在 30(或 60)fps 源中丢失单个帧几乎不会引起注意,但是在重新获取旧帧的同时在视频上放置一个微调器是非常明显的。

因此,也许首先要考虑的是您可能希望(或需要)放宽您的要求。网络与硬连线控件不同,任何像这样的解决方案都是自然分布的,因此必须具有某种最终一致性或某种分区不容忍性(参见:CAP 定理)。

socket.SendAsync 似乎正在接收数据并在发送之前在本地缓冲它,在实际发送之前返回 TaskCompleted 。这意味着发送任务基本上是瞬间完成的。

这不仅仅是 websockets,或 .NET,甚至是 Windows。这是(故意)所有 TCP/IP API 的工作方式。“发送”在到达 OS 缓冲区时被视为完成。在 TCP/IP 世界中,您可以保证数据最终会到达接收者(可能在重试之后),或者您的套接字最终会进入错误状态。

我当然“可以”实现一个 ACK​​ 模式,接收者在收到整个数据包时发送一个 ACK​​,但这本身会减慢进程,我想避免这种情况,因为系统本身已经知道这一点,因为它确实会尝试连续发送排队的包裹。

Websockets 确实具有消息抽象并为您维护消息边界。但是 websocket 是通过 TCP/IP 实现的,这是一个流抽象(不是消息或数据包)。网络本身使用数据包抽象,而数据包是 ACK/RST/etc 发挥作用的地方。因此,操作系统知道数据包级别的 ACK/等,并将从 TCP/IP 流缓冲区中删除字节。但是 websockets 没有消息级别的 ACK,因此无法知道何时收到消息。

Websockets 是建立在 TCP/IP 之上的,所以你得到同样的保证:你知道发送的消息要么(最终)被接收,要么(最终)你会收到错误通知。这就是你所得到的。与 TCP/IP 的唯一主要区别是您获得消息抽象而不是流抽象。

因此,您的选择是:

  • 接受最终的一致性。同时向所有设备发送消息,并接受某些更新可能会延迟。
  • 接受分区不容忍。添加 ACK 响应消息,只有在所有设备都 ACK 后才发送下一次更新。(这不能容忍分区,因为如果一个设备断开连接,它们都会中断)。

如果您确实采用了 ACK-lockstep 方法,您可能需要考虑使用 UDP/IP 而不是基于 TCP/IP 的协议,但这需要大量工作。

同步执行这个真正的同步将是一个解决方案,但我似乎无法强制这样做。

同步 API 不是解决方案。没有 API 可以满足您的需求;任何地方的所有 TCP/IP 写入在到达操作系统时都认为写入“完成”;无论同步/异步、语言、运行时或操作系统如何,都是如此。

于 2022-02-07T17:09:27.493 回答