7

编辑:这个问题的要求已经改变。请参阅下面的更新部分。

我有一个异步迭代器方法,它每 200 毫秒产生一个IAsyncEnumerable<int>(数字流)一个数字。此方法的调用者使用流,但希望在 1000 毫秒后停止枚举。因此CancellationTokenSource使用了 a,并且令牌作为参数传递给WithCancellation扩展方法。但是令牌不受尊重。枚举一直持续到所有数字都被消耗完:

static async IAsyncEnumerable<int> GetSequence()
{
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(200);
        yield return i;
    }
}

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

输出:

12:55:17.506 > 1
12:55:17.739 > 2
12:55:17.941 > 3
12:55:18.155 > 4
12:55:18.367 > 5
12:55:18.570 > 6
12:55:18.772 > 7
12 :55:18.973 > 8
12:55:19.174 > 9
12:55:19.376 > 10

预期的输出是TaskCanceledException在数字 5 之后发生。看来我误解了WithCancellation实际在做什么。该方法只是将提供的令牌传递给迭代器方法,如果该方法接受一个。否则,就像GetSequence()我的示例中的方法一样,令牌将被忽略。我想我的解决方案是手动查询枚举体内的令牌:

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())
{
    cts.Token.ThrowIfCancellationRequested();
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

这很简单并且效果很好。但无论如何,我想知道是否有可能创建一个扩展方法来做我期望WithCancellation做的事情,在随后的枚举中烘焙令牌。这是所需方法的签名:

public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
    // Is it possible?
}

更新:似乎当我问这个问题时,我对整个取消概念的目的有一个不正确的理解。我的印象是取消是为了在等待之后打破循环MoveNextAsync,而真正的目的是取消等待本身。在我的简单示例中,等待仅持续 200 毫秒,但在现实世界的示例中,等待可能更长,甚至是无限的。意识到这一点后,我现在的问题几乎没有价值,我必须要么删除它并打开一个具有相同标题的新问题,要么更改现有问题的要求。这两种选择都以某种方式不好。

我决定选择第二个选项。因此,我不接受当前接受的答案,并且我正在寻求一种新的解决方案,以解决以立即生效的方式执行取消的更困难的问题。换句话说,取消令牌应该会导致异步枚举在几毫秒内完成。让我们举一个实际的例子来区分合意和不合意的行为:

var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
{
    await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
    {
        Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
}

输出(理想):

0:00.242 > 1
0:00.467 > 2
0:00.500 > 取消

输出(不良):

0:00.242 > 1
0:00.467 > 2
0:00.707 > 取消

GetSequence与初始示例中的方法相同,每 200 毫秒传输一个数字。此方法不支持取消,前提是我们无法更改。WithEnforcedCancellation是应该解决此问题的必需扩展方法。

4

3 回答 3

16

IAsyncEnumerable显式地为这种机制提供了以下EnumeratorCancellation属性:

static async IAsyncEnumerable<int> GetSequence([EnumeratorCancellation] CancellationToken ct = default) {
    for (int i = 1; i <= 10; i++) {
        ct.ThrowIfCancellationRequested();
        await Task.Delay(200);    // or `Task.Delay(200, ct)` if this wasn't an example
        yield return i;
    }
}

事实上,如果你给方法一个CancellationToken参数,但不添加属性,编译器就会发出警告。

请注意,传递给的令牌.WithCancellation将覆盖传递给该方法的任何本地令牌。规格有这方面的详细信息。

当然,这仍然只有在枚举实际上接受 a 时才有效CancellationToken——但取消只有在合作完成时才真正有效的事实适用于任何async工作。Yeldar 的回答有利于将某种取消措施“强制”到不支持它的可枚举中,但首选的解决方案应该是修改枚举以自行支持取消——编译器会尽一切努力帮助您。

于 2019-10-04T13:12:13.437 回答
4

您可以将您的逻辑提取到这样的扩展方法中:

public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
    if (source == null)
        throw new ArgumentNullException(nameof(source));

    cancellationToken.ThrowIfCancellationRequested();

    await foreach (var item in source)
    {
        cancellationToken.ThrowIfCancellationRequested();
        yield return item;
    }
}
于 2019-10-04T10:50:41.620 回答
2

我认为重申你应该这样做很重要。让异步方法支持取消令牌总是更好,然后取消是您所期望的那样立即。如果那不可能,我仍然建议在尝试此答案之前尝试其他答案之一。

话虽如此,如果您无法向 async 方法添加取消支持,并且您确实需要立即终止foreach,那么您可以绕过它。

一个技巧是使用Task.WhenAny两个参数:

  1. 你得到的任务IAsyncEnumerator.MoveNextAsync()
  2. 另一个支持取消的任务

这是简短的版本

// Start the 'await foreach' without the new syntax
// because we need access to the ValueTask returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);

// Combine MoveNextAsync() with another Task that can be awaited indefinitely,
// until it throws OperationCanceledException
var untilCanceled = UntilCanceled(cancellationToken);
while (await await Task.WhenAny(enumerator.MoveNextAsync().AsTask(), untilCanceled))
{
    yield return enumerator.Current;
}

ConfigureAwait(false)具有完整性和完整性的长版本,DisposeAsync()如果您在本地运行它应该可以工作。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class AsyncStreamHelper
{
    public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(this IAsyncEnumerable<T> source, [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        if (source == null)
            throw new ArgumentNullException(nameof(source));
        cancellationToken.ThrowIfCancellationRequested();

        // Start the 'await foreach' without the new syntax
        // because we need access to the ValueTask returned by MoveNextAsync()
        var enumerator = source.GetAsyncEnumerator(cancellationToken);
        Task<bool> moveNext = null;

        // Combine MoveNextAsync() with another Task that can be awaited indefinitely,
        // until it throws OperationCanceledException
        var untilCanceled = UntilCanceled(cancellationToken);
        try
        {
            while (
                await (
                    await Task.WhenAny(
                        (
                            moveNext = enumerator.MoveNextAsync().AsTask()
                        ),
                        untilCanceled
                    ).ConfigureAwait(false)
                )
            )
            {
                yield return enumerator.Current;
            }
        }
        finally
        {
            if (moveNext != null && !moveNext.IsCompleted)
            {
                // Disable warning CS4014 "Because this call is not awaited, execution of the current method continues before the call is completed"
#pragma warning disable 4014 // This is the behavior we want!

                moveNext.ContinueWith(async _ =>
                {
                    await enumerator.DisposeAsync();
                }, TaskScheduler.Default);
#pragma warning restore 4014
            }
            else if (enumerator != null)
            {
                await enumerator.DisposeAsync();
            }
        }
    }

    private static Task<bool> UntilCanceled(CancellationToken cancellationToken)
    {
        // This is just one possible implementation... feel free to swap out for something else
        return new Task<bool>(() => true, cancellationToken);
    }
}

public class Program
{
    public static async Task Main()
    {
        var cts = new CancellationTokenSource(500);
        var stopwatch = Stopwatch.StartNew();
        try
        {
            await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
            {
                Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
        }
    }

    static async IAsyncEnumerable<int> GetSequence()
    {
        for (int i = 1; i <= 10; i++)
        {
            await Task.Delay(200);
            yield return i;
        }
    }
}

注意事项

枚举器返回一个 ValueTask 以提高性能(使用比常规任务更少的分配),但 ValueTask 不能与 一起使用Task.WhenAny(),因此AsTask()使用它会通过引入分配开销而降低性能。

只有完成了最近的枚举器,才能释放枚举器MoveNextAsync()。当请求取消时,任务更有可能仍在运行。DisposeAsync这就是为什么我在延续任务中添加了另一个调用。

WithEnforcedCancellation()在这种情况下,当方法退出时,枚举器还没有被释放。它将在枚举被放弃后的一段时间内被处理。如果DisposeAsync()抛出异常,异常将丢失。它不能冒泡调用堆栈,因为没有调用堆栈。

于 2021-09-02T00:49:38.140 回答