1

在我的 .Net Core 应用程序中,第 3 方库中有一个写入System.IO.Stream接口的方法(它将流接口作为参数并写入它),但我希望该数据进入我的数据源,该数据源期望数据作为IAsyncEnumerable<bytes>溪流。我着手编写实现Stream接口的代码,因此当Write()被调用时它会写入IAsyncEnumerable<bytes>,然后认为“这必须在之前完成” - 似乎它是通用的。

那么在第 3 方库中是否有标准实现,或者我缺少任何“巧妙的技巧”?

4

1 回答 1

1

这是一个自定义Stream实现,用于异步生产者-消费者场景。它是一个只可写的流,只有通过特殊GetConsumingEnumerable方法才能读取(消费)它。

public class ProducerConsumerStream : Stream
{
    private readonly Channel<byte> _channel;

    public ProducerConsumerStream(bool singleReader = true, bool singleWriter = true)
    {
        _channel = Channel.CreateUnbounded<byte>(new UnboundedChannelOptions()
        {
            SingleReader = singleReader,
            SingleWriter = singleWriter
        });
    }

    public override bool CanRead { get { return false; } }
    public override bool CanSeek { get { return false; } }
    public override bool CanWrite { get { return true; } }
    public override long Length { get { throw new NotSupportedException(); } }
    public override void Flush() { }

    public override long Position
    {
        get { throw new NotSupportedException(); }
        set { throw new NotSupportedException(); }
    }

    public override long Seek(long offset, SeekOrigin origin)
        => throw new NotSupportedException();

    public override void SetLength(long value)
        => throw new NotSupportedException();

    public override int Read(byte[] buffer, int offset, int count)
        => throw new NotSupportedException();

    public override void Write(byte[] buffer, int offset, int count)
    {
        if (buffer == null) throw new ArgumentNullException(nameof(buffer));
        if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset));
        if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
        if (offset + count > buffer.Length)
            throw new ArgumentOutOfRangeException(nameof(count));

        for (int i = offset; i < offset + count; i++)
            _channel.Writer.TryWrite(buffer[i]);
    }

    public override void WriteByte(byte value)
    {
        _channel.Writer.TryWrite(value);
    }

    public override void Close()
    {
        base.Close();
        _channel.Writer.Complete();
    }

    public IAsyncEnumerable<byte> GetConsumingEnumerable(
        CancellationToken cancellationToken = default)
    {
        return _channel.Reader.ReadAllAsync(cancellationToken);
    }
}

此实现基于Channel<byte>. 如果您对频道不熟悉,这里有一个教程

于 2021-07-05T19:00:16.717 回答