3

我正在研究使用管道处理来自网络的二进制消息的可能性。我将要处理的二进制消息带有一个有效负载,最好将有效负载保持为二进制形式。

这个想法是读出整个消息并创建一个消息片段及其有效负载,一旦消息被完全读取,它将被传递到通道链进行处理,处理不会是即时的,可能需要一些时间或被执行稍后,目标是不要让管道阅读器等到处理完成,然后一旦消息处理完成,我需要将处理后的缓冲区释放给管道编写器。

现在我当然可以创建一个新的字节数组并复制来自管道写入器的数据,但这会超出不复制的目的吗?所以据我了解,我需要管道和通道之间的一些缓冲区同步?我观察了管道阅读器的可用 api ( AdvanceTo ),它可以告诉管道阅读器消耗了什么以及检查了什么,但无法解决如何在管道读取方法之外同步它。

所以问题是是否有一些技术或例子来说明如何实现这一点。

4

1 回答 1

2

TryRead/获得的缓冲区ReadAsync仅在您调用 之前有效AdvanceTo期望一旦您完成此操作:您报告为消耗的任何内容都可以回收以在其他地方使用(可能是并行/并发读取器)。严格来说:即使您没有报告为消耗的位:一旦您调用,您仍然不应该将其视为有效AdvanceTo(尽管实际上,它们很可能仍然是相同的段 - 只是:那是' t 调用者的关注;对调用者来说,它只在读取和提前之间有效)。

这意味着您明确不能这样做:

while (...)
{
    var result = await pipe.ReadAsync();
    if (TryIdentifyFrameBoundary(out var frame)) {
        BeginProcessingInBackground(frame); // <==== THIS IS A PROBLEM!
        reader.AdvanceTo(frame.End, frame.End);
    }
    else if { // take nothing
        reader.AdvanceTo(buffer.Start, buffer.End);
        if (result.IsCompleted) break; // that's all folks
    }
}

因为“在后台”位,当它触发时,现在可能正在读取其他人的数据(由于它已经被重用)。

所以:要么您需要将帧内容作为读取循环的一部分进行处理,要么您将不得不制作数据的副本,很可能通过使用:

c#
var len = checked ((int)buffer.Length);
var oversized = ArrayPool<byte>.Shared.Rent(len);
buffer.CopyTo(oversized);

并传递oversized给您的后台处理,记住只查看len它的第一个字节。您可以将其作为 a 传递ReadOnlyMemory<byte>,但您需要考虑到之后您还希望将其返回到数组池(可能在一个finally块中),并将其作为内存传递会使其更加尴尬(但是并非不可能,感谢MemoryMarshal.TryGetArray)。


注意:在管道 API 的早期版本中,有一个引用计数元素,它确实允许您保留缓冲区,但它有一些问题:

  • 它极大地复杂化了 API
  • 它导致缓冲区泄漏
  • “保留”的含义模棱两可且令人困惑;在它被重用之前是计数吗?还是完全释放

所以这个功能被删除了。

于 2020-09-14T08:25:42.207 回答