该类ChannelReader<T>
有一个ReadAllAsync
方法可以将阅读器的数据公开为IAsyncEnumerable<T>
. 下面是这个方法的一个重载,它也接受一个timeout
参数。此参数的效果是,如果阅读器在指定的时间跨度内未能发出任何项目,TimeoutException
则会抛出 a。
为了减少分配,它使用了Greg 的回答中相同的巧妙技术,CancellationTokenSource
在每次迭代后重新安排取消分配。经过一番思考,我删除了这条线CancelAfter(int.MaxValue)
,因为它在一般情况下可能比有用更有害,但我可能错了。
public static async IAsyncEnumerable<TSource> ReadAllAsync<TSource>(
this ChannelReader<TSource> source, TimeSpan timeout,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (true)
{
using var cts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeout);
while (true)
{
try
{
if (!await source.WaitToReadAsync(cts.Token).ConfigureAwait(false))
yield break;
}
catch (OperationCanceledException)
{
cancellationToken.ThrowIfCancellationRequested();
throw new TimeoutException();
}
while (source.TryRead(out var item))
{
yield return item;
cancellationToken.ThrowIfCancellationRequested();
}
cts.CancelAfter(timeout);
// It is possible that the CTS timed-out during the yielding
if (cts.IsCancellationRequested) break; // Start a new loop with a new CTS
}
}
}
附带说明一下,System.Interactive.Async包包含一个Timeout
具有如下签名的运算符,可以与内置的ReadAllAsync
,并提供与上述实现相同的功能. 但是,该方法并未针对低分配进行优化。
public static IAsyncEnumerable<TSource> Timeout<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeout);
注意:回想起来这个ReadAllAsync().Timeout()
想法是危险的,因为它ReadAllAsync
是一种消耗方法。换句话说,枚举它具有从通道中删除项目的副作用。操作员不知道源序列内部发生了什么,Timeout
因此在不幸的时刻发生超时可能会导致项目丢失。这使得顶部的实现成为问题的唯一可靠解决方案(在此答案的范围内)。