这是一个自定义Stream
实现,用于异步生产者-消费者场景。它是一个只可写的流,只有通过特殊GetConsumingEnumerable
方法才能读取(消费)它。
public class ProducerConsumerStream : Stream
{
private readonly Channel<byte> _channel;
public ProducerConsumerStream(bool singleReader = true, bool singleWriter = true)
{
_channel = Channel.CreateUnbounded<byte>(new UnboundedChannelOptions()
{
SingleReader = singleReader,
SingleWriter = singleWriter
});
}
public override bool CanRead { get { return false; } }
public override bool CanSeek { get { return false; } }
public override bool CanWrite { get { return true; } }
public override long Length { get { throw new NotSupportedException(); } }
public override void Flush() { }
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
public override long Seek(long offset, SeekOrigin origin)
=> throw new NotSupportedException();
public override void SetLength(long value)
=> throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count)
{
if (buffer == null) throw new ArgumentNullException(nameof(buffer));
if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
if (offset + count > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(count));
for (int i = offset; i < offset + count; i++)
_channel.Writer.TryWrite(buffer[i]);
}
public override void WriteByte(byte value)
{
_channel.Writer.TryWrite(value);
}
public override void Close()
{
base.Close();
_channel.Writer.Complete();
}
public IAsyncEnumerable<byte> GetConsumingEnumerable(
CancellationToken cancellationToken = default)
{
return _channel.Reader.ReadAllAsync(cancellationToken);
}
}
此实现基于Channel<byte>
. 如果您对频道不熟悉,这里有一个教程。