0

我想为ChannelReader.ReadyAsync添加超时。这是我找到的两个解决方案:

var cts = new CancellationTokenSource();
cts.CancelAfter(2000);
try {
  var data = chan.ReadAsync(cts.Token);
} catch (OperationCanceledException) {
  // timeout
}
var tasks = new Task[] { Task.Delay(2000), chan.ReadAsync(CancellationToken.None) };
var completedTask = await Task.WhenAny(tasks);
if (completedTask == tasks[0])
  // timeout
else
  var data = ((T)completedTask).Result;

但是,这两种解决方案都不是免分配的。第一个分配一个 CancellationTokenSource,第二个分配一个 Timer 在 Task.Delay 中。有没有办法在没有任何分配的情况下制作类似的代码?

编辑 1:使用第一个解决方案时的 dotTrace 输出 调用树 分配类型

4

2 回答 2

1

该类ChannelReader<T>有一个ReadAllAsync方法可以将阅读器的数据公开为IAsyncEnumerable<T>. 下面是这个方法的一个重载,它也接受一个timeout参数。此参数的效果是,如果阅读器在指定的时间跨度内未能发出任何项目,TimeoutException则会抛出 a。

为了减少分配,它使用了Greg 的回答中相同的巧妙技术,CancellationTokenSource在每次迭代后重新安排取消分配。经过一番思考,我删除了这条线CancelAfter(int.MaxValue),因为它在一般情况下可能比有用更有害,但我可能错了。

public static async IAsyncEnumerable<TSource> ReadAllAsync<TSource>(
    this ChannelReader<TSource> source, TimeSpan timeout,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    while (true)
    {
        using var cts = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken);
        cts.CancelAfter(timeout);
        while (true)
        {
            try
            {
                if (!await source.WaitToReadAsync(cts.Token).ConfigureAwait(false))
                    yield break;
            }
            catch (OperationCanceledException)
            {
                cancellationToken.ThrowIfCancellationRequested();
                throw new TimeoutException();
            }
            while (source.TryRead(out var item))
            {
                yield return item;
                cancellationToken.ThrowIfCancellationRequested();
            }

            cts.CancelAfter(timeout);
            // It is possible that the CTS timed-out during the yielding
            if (cts.IsCancellationRequested) break; // Start a new loop with a new CTS
        }
    }
}

附带说明一下,System.Interactive.Async包包含一个Timeout具有如下签名的运算符,可以与内置的ReadAllAsync,并提供与上述实现相同的功能. 但是,该方法并未针对低分配进行优化。

public static IAsyncEnumerable<TSource> Timeout<TSource>(
    this IAsyncEnumerable<TSource> source, TimeSpan timeout);

注意:回想起来这个ReadAllAsync().Timeout()想法是危险的,因为它ReadAllAsync是一种消耗方法。换句话说,枚举它具有从通道中删除项目的副作用。操作员不知道源序列内部发生了什么,Timeout因此在不幸的时刻发生超时可能会导致项目丢失。这使得顶部的实现成为问题的唯一可靠解决方案(在此答案的范围内)。

于 2020-03-18T11:34:24.273 回答
1

感谢您的回答,他们让我再次想起了我正在寻找的东西:重用CancellationTokenSource。一旦 aCancellationTokenSource被取消,您将无法重复使用它。但在我的情况下,ChannelReader.ReadAsync大部分时间会在超时触发之前返回,所以我使用CancelAfter了第二次调用它时不会重新创建计时器的事实,以避免取消CancellationTokenSource之后的ChannelReader.ReadAsync返回。

var timeoutCancellation = new CancellationTokenSource();

while (true)
{
    if (timeoutCancellation.IsCancellationRequested)
    {
        timeoutCancellation.Dispose();
        timeoutCancellation = new CancellationTokenSource();
    }

    T data;
    try
    {
        timeoutCancellation.CancelAfter(2000);
        data = await _queue.Reader.ReadAsync(timeoutCancellation.Token);
        // make sure it doesn't get cancelled so it can be reused in the next iteration
        // Timeout.Infinite won't work because it would delete the underlying timer
        timeoutCancellation.CancelAfter(int.MaxValue);
    }
    catch (OperationCanceledException) // timeout reached
    {
        // handle timeout
        continue;
    }

    // process data
}

这不是无分配的,但它大大减少了分配对象的数量。

于 2020-03-17T17:27:07.057 回答