我最近发现了 System.IO.Pipelines 命名空间,它看起来很有趣。我一直在尝试在一个简单的 TCP 服务器的上下文中实现 IDuplexPipe 接口,该服务器接受连接,然后与连接的客户端来回通信。
但是,我正在努力使其稳定。感觉好像我误解了一些基本的东西。我也一直在搜索接口的参考实现,以指导我朝着正确的方向前进。
据我所知,这是关于 System.IO.Pipelines 的最完整的文档。我在下面提供的示例大量借鉴了它。
- https://github.com/davidfowl/DocsStaging/blob/master/Pipelines.md
- https://devblogs.microsoft.com/dotnet/system-io-pipelines-high-performance-io-in-net/
我的问题:在 TCP 服务器的上下文中,IDuplexPipe 接口的典型实现是什么样的?
顺便说一句,这就是我目前所拥有的。这个想法是通过提供已建立的 SslStream 来设置新的“双工通信”:
public class DuplexCommunication : IDuplexPipe
{
public PipeReader Input => _receivePipe.Reader;
public PipeWriter Output => _transmitPipe.Writer;
private readonly SslStream _stream;
// Data received from the SslStream will end up on this pipe
private readonly Pipe _receivePipe = new Pipe();
// Data that is to be transmitted over the SslStream ends up on this pipe
private readonly Pipe _transmitPipe = new Pipe();
private readonly CancellationToken _cts;
private Task _receive;
private Task _transmit;
public DuplexCommunication(SslStream stream, CancellationToken cts)
{
_stream = stream;
_cts = cts;
_receive = Receive();
_transmit = Transmit();
}
private async Task Receive()
{
Exception error = null;
try
{
while (!_cts.IsCancellationRequested)
{
var buffer = _receivePipe.Writer.GetMemory(1);
var bytes = await _stream.ReadAsync(buffer, _cts);
_receivePipe.Writer.Advance(bytes);
if (bytes == 0) {
break;
}
var flush = await _receivePipe.Writer.FlushAsync(_cts);
if (flush.IsCompleted || flush.IsCanceled)
{
break;
}
}
}
catch (Exception ex)
{
// This might be "stream is closed" or similar, from when trying to read from the stream
Console.WriteLine($"DuplexPipe ReceiveTask caugth an exception: {ex.Message}");
error = ex;
}
finally
{
await _receivePipe.Writer.CompleteAsync(error);
}
}
private async Task Transmit()
{
Exception error = null;
try
{
while (!_cts.IsCancellationRequested)
{
var read = await _transmitPipe.Reader.ReadAsync(_cts);
var buffer = read.Buffer;
if (buffer.IsEmpty && read.IsCompleted)
{
break;
}
foreach (var segment in buffer)
{
await _stream.WriteAsync(segment, _cts);
}
_transmitPipe.Reader.AdvanceTo(buffer.End);
await _stream.FlushAsync(_cts);
}
}
catch (Exception e)
{
Console.WriteLine($"DuplexPipe Transmit caught an exception: {e.Message}");
error = e;
}
finally
{
await _transmitPipe.Reader.CompleteAsync(error);
}
}
}