概述
我正在尝试IAsyncEnumerable<T>
围绕接口编写一个包装器IObserver<T>
。起初我使用 aBufferBlock<T>
作为后备数据存储,但通过性能测试和研究发现它实际上是一个非常慢的类型,所以我决定System.Threading.Channels.Channel
试一试。我的 BufferBlock 实现有一个与这个类似的问题,但这次我不知道如何解决它。
问题
如果我的方法尚未写入,我GetAsyncEnumerator()
的循环会被await _channel.Reader.WaitToRead(token)
调用阻塞。在不阻塞程序执行的情况下等待一个值在此上下文中可用的正确方法是什么?IObserver<T>.OnNext()
_channel
执行
public sealed class ObserverAsyncEnumerableWrapper<T> : IAsyncEnumerable<T>, IObserver<T>, IDisposable
{
private readonly IDisposable _unsubscriber;
private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();
private bool _producerComplete;
public ObserverAsyncEnumerableWrapper(IObservable<T> provider)
{
_unsubscriber = provider.Subscribe(this);
}
public async void OnNext(T value)
{
Log.Logger.Verbose("Adding value to Channel.");
await _channel.Writer.WriteAsync(value);
}
public void OnError(Exception error)
{
_channel.Writer.Complete(error);
}
public void OnCompleted()
{
_producerComplete = true;
}
public async IAsyncEnumerator<T> GetAsyncEnumerator([EnumeratorCancellation] CancellationToken token = new CancellationToken())
{
Log.Logger.Verbose("Starting async iteration...");
while (await _channel.Reader.WaitToReadAsync(token) || !_producerComplete)
{
Log.Logger.Verbose("Reading...");
while (_channel.Reader.TryRead(out var item))
{
Log.Logger.Verbose("Yielding item.");
yield return item;
}
Log.Logger.Verbose("Awaiting more items.");
}
Log.Logger.Verbose("Iteration Complete.");
_channel.Writer.Complete();
}
public void Dispose()
{
_channel.Writer.Complete();
_unsubscriber?.Dispose();
}
}
附加上下文
没关系,但在运行时IObservable<T>
传递给构造函数的实例是CimAsyncResult
从对 api 进行的异步调用返回的Microsoft.Management.Infrastructure
。这些使用了我试图用花哨的新异步枚举模式包装的 Observer 设计模式。
编辑
更新了对调试器输出的日志记录,并OnNext()
按照一位评论者的建议使我的方法异步/等待。你可以看到它永远不会进入while()
循环。