6

我发现如果不检查特定任务或委托背后的来源,我无法区分受控/合作与“不受控制”取消任务/委托。

具体来说,我一直假设当OperationCanceledException从“较低级别的操作”中捕获抛出时,如果引用的令牌无法与当前操作的令牌匹配,那么它应该被解释为失败/错误。这是它放弃(退出)的“较低级别操作”的声明,但不是因为您要求它这样做

不幸的是,TaskCompletionSource不能将 aCancellationToken作为取消的原因。因此,任何不受内置调度程序支持的任务都无法传达其取消的原因,并且可能会将协作取消误报为错误。

更新:从 .NET 4.6 开始,TaskCompletionSource可以关联 aCancellationToken 如果新的重载为SetCanceledorTrySetCanceled被使用。

例如以下

public Task ShouldHaveBeenAsynchronous(Action userDelegate, CancellationToken ct)
{
    var tcs = new TaskCompletionSource<object>();

    try
    {
      userDelegate();
      tcs.SetResult(null);   // Indicate completion
    }
    catch (OperationCanceledException ex)
    {
      if (ex.CancellationToken == ct)
        tcs.SetCanceled(); // Need to pass ct here, but can't
      else
        tcs.SetException(ex);
    }
    catch (Exception ex)
    {
      tcs.SetException(ex);
    }

    return tcs.Task;
}

private void OtherSide()
{
    var cts = new CancellationTokenSource();
    var ct = cts.Token;
    cts.Cancel();
    Task wrappedOperation = ShouldHaveBeenAsynchronous(
        () => { ct.ThrowIfCancellationRequested(); }, ct);

    try
    {
        wrappedOperation.Wait();
    }
    catch (AggregateException aex)
    {
        foreach (var ex in aex.InnerExceptions
                              .OfType<OperationCanceledException>())
        {
            if (ex.CancellationToken == ct)
                Console.WriteLine("OK: Normal Cancellation");
            else
                Console.WriteLine("ERROR: Unexpected cancellation");
        }
    }
}

即使通过分发给所有组件的取消令牌请求取消,也会导致“错误:意外取消”。

核心问题是TaskCompletionSource不知道CancellationToken,但是如果用于在Tasks中包装异步操作的“转到”机制无法跟踪这一点,那么我认为人们不能指望它会被跨接口跟踪(图书馆)边界。

事实上 TaskCompletionSource 可以处理这个,但是必要的 TrySetCanceled 重载是内部的,所以只有 mscorlib 组件可以使用它。

那么有没有人有一种模式来传达取消已跨任务和委托边界“处理”?

4

3 回答 3

2

我发现如果不检查它们是如何实现的细节,我就无法区分任务/委托的受控取消和“不受控”取消。

此外,您在等待或等待任务时捕获OperationCanceledException异常的事实并不一定意味着该任务StatusTaskStatus.Canceled. 它也可能是TaskStatus.Faulted

可能有几个选项可以实现您所追求的。我会使用ContinueWith并将该延续任务传递给客户端代码,而不是原始代码TaskCompletionSource.Task

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    public static class TaskExt
    {
        public static Task<TResult> TaskWithCancellation<TResult>(
            this TaskCompletionSource<TResult> @this,
            CancellationToken token)
        {
            var registration = token.Register(() => @this.TrySetCanceled());
            return @this.Task.ContinueWith(
                task => { registration.Dispose(); return task.Result; },
                token, 
                TaskContinuationOptions.LazyCancellation | 
                    TaskContinuationOptions.ExecuteSynchronously, 
                TaskScheduler.Default);
        }
    }

    class Program
    {
        static async Task OtherSideAsync(Task task, CancellationToken token)
        {
            try
            {
                await task;
            }
            catch (OperationCanceledException ex)
            {
                if (token != ex.CancellationToken)
                    throw;
                Console.WriteLine("Cancelled with the correct token");
            }
        }

        static void Main(string[] args)
        {
            var cts = new CancellationTokenSource(1000); // cancel in 1s
            var tcs = new TaskCompletionSource<object>();

            var taskWithCancellation = tcs.TaskWithCancellation(cts.Token);
            try
            {
                OtherSideAsync(taskWithCancellation, cts.Token).Wait();
            }
            catch (AggregateException ex)
            {
                Console.WriteLine(ex.InnerException.Message);
            }
            Console.ReadLine();
        }
    }
}

注意 的使用TaskContinuationOptions.LazyCancellation,它可以确保在任务之前永远不会完成延续tcs.Task任务(当通过 请求取消时token)。

另请注意,如果tcs.TrySetCanceled在通过 请求取消之前调用token,则生成的任务将处于错误状态而不是取消状态(taskWithCancellation.IsFaulted == true但是taskWithCancellation.IsCancelled == false)。如果您希望为隐式token和显式tcs.TrySetCanceled取消传播取消状态,请像这样更改TaskWithCancellation扩展名:

public static Task<TResult> TaskWithCancellation<TResult>(
    this TaskCompletionSource<TResult> @this,
    CancellationToken token)
{
    var registration = token.Register(() => @this.TrySetCanceled());
    return @this.Task.ContinueWith(
        task => { registration.Dispose(); return task; },
        token,
        TaskContinuationOptions.LazyCancellation | 
            TaskContinuationOptions.ExecuteSynchronously,
        TaskScheduler.Default).Unwrap();
}

更新以解决评论:

基于 的库 API的典型设计Task是客户端代码向 API 提供取消令牌,并且 API 返回与提供的令牌Task相关联的 。然后,API 的客户端代码可以在捕获取消异常时进行令牌匹配。

的确切目的TaskWithCancellation是创建此类Task并将其返回给客户端。原件TaskCompletionSource.Task永远不会暴露给客户。取消发生是因为令牌被传递给ContinueWith,这就是它与继续任务相关联的方式。OTOH,token.RegisterTrySetCanceledTaskContinuationOptions.LazyCancellation用于确保事情以正确的顺序发生,包括注册清理。

于 2014-05-15T05:29:20.660 回答
2

仅作记录:这已在 .NET framework 4.6 及更高版本中修复 TaskCompletionSource.TrySetCanceled 方法(CancellationToken)

于 2015-09-06T10:08:00.797 回答
1

记录在案:是的,API 已损坏,因为 TaskCompletionSource 应接受 CancellationToken。.NET 运行时修复了这个问题以供自己使用,但在 .NET 4.6 之前没有公开修复(TrySetCanceled 的重载) 。

作为任务消费者,有两个基本选项。

  1. 始终检查 Task.Status
  2. 如果请求取消,只需检查您自己的 CancellationToken 并忽略任务错误。

所以像:

object result;
try
{
    result = task.Result;
}
// catch (OperationCanceledException oce) // don't rely on oce.CancellationToken
catch (Exception ex)
{
    if (task.IsCancelled)
        return; // or otherwise handle cancellation

    // alternatively
    if (cancelSource.IsCancellationRequested)
        return; // or otherwise handle cancellation

    LogOrHandleError(ex);
}

第一个依赖于库编写者使用 TaskCompletionSource.TrySetCanceled 而不是使用提供匹配令牌的 OperationCanceledException 执行 TrySetException。

第二个不依赖库编写者做任何“正确”的事情,除了做任何必要的事情来处理他们的代码异常。这可能无法记录错误以进行故障排除,但无论如何都无法(合理地)从内部外部代码中清理操作状态。

对于任务生产者,可以

  1. 尝试通过使用反射将令牌与任务取消相关联来兑现 OperationCanceledException.CancellationToken 合同。
  2. 使用 Continuation 将令牌与返回的任务相关联。

后者很简单,但像 Consumer 选项 2 可能会忽略任务错误(甚至在执行序列停止之前很久就将任务标记为已完成)。

两者的完整实现(包括缓存委托以避免反射)......

更新:对于 .NET 4.6 及更高版本,只需调用TaskCompletionSource.TrySetCanceled接受.NET 的新公共重载即可CancellationToken。当与 .NET 4.6 链接时,使用下面扩展方法的代码将自动切换到该重载(如果使用扩展方法语法进行调用)。

static class TaskCompletionSourceExtensions
{
    /// <summary>
    /// APPROXIMATION of properly associating a CancellationToken with a TCS
    /// so that access to Task.Result following cancellation of the TCS Task 
    /// throws an OperationCanceledException with the proper CancellationToken.
    /// </summary>
    /// <remarks>
    /// If the TCS Task 'RanToCompletion' or Faulted before/despite a 
    /// cancellation request, this may still report TaskStatus.Canceled.
    /// </remarks>
    /// <param name="this">The 'TCS' to 'fix'</param>
    /// <param name="token">The associated CancellationToken</param>
    /// <param name="LazyCancellation">
    /// true to let the 'owner/runner' of the TCS complete the Task
    /// (and stop executing), false to mark the returned Task as Canceled
    /// while that code may still be executing.
    /// </param>
    public static Task<TResult> TaskWithCancellation<TResult>(
        this TaskCompletionSource<TResult> @this,
        CancellationToken token,
        bool lazyCancellation)
    {
        if (lazyCancellation)
        {
            return @this.Task.ContinueWith(
                (task) => task,
                token,
                TaskContinuationOptions.LazyCancellation |
                    TaskContinuationOptions.ExecuteSynchronously,
                TaskScheduler.Default).Unwrap();
        }

        return @this.Task.ContinueWith((task) => task, token).Unwrap();
        // Yep that was a one liner!
        // However, LazyCancellation (or not) should be explicitly chosen!
    }


    /// <summary>
    /// Attempts to transition the underlying Task into the Canceled state
    /// and set the CancellationToken member of the associated 
    /// OperationCanceledException.
    /// </summary>
    public static bool TrySetCanceled<TResult>(
        this TaskCompletionSource<TResult> @this,
        CancellationToken token)
    {
        return TrySetCanceledCaller<TResult>.MakeCall(@this, token);
    }

    private static class TrySetCanceledCaller<TResult>
    {
        public delegate bool MethodCallerType(TaskCompletionSource<TResult> inst, CancellationToken token);

        public static readonly MethodCallerType MakeCall;

        static TrySetCanceledCaller()
        {
            var type = typeof(TaskCompletionSource<TResult>);

            var method = type.GetMethod(
                "TrySetCanceled",
                System.Reflection.BindingFlags.Instance |
                System.Reflection.BindingFlags.NonPublic,
                null,
                new Type[] { typeof(CancellationToken) },
                null);

            MakeCall = (MethodCallerType)
                Delegate.CreateDelegate(typeof(MethodCallerType), method);
        }
    }
}

和测试程序...

class Program
{
    static void Main(string[] args)
    {
        //var cts = new CancellationTokenSource(6000); // To let the operation complete
        var cts = new CancellationTokenSource(1000);
        var ct = cts.Token;
        Task<string> task = ShouldHaveBeenAsynchronous(cts.Token);

        try
        {
            Console.WriteLine(task.Result);
        }
        catch (AggregateException aex)
        {
            foreach (var ex in aex.Flatten().InnerExceptions)
            {
                var oce = ex as OperationCanceledException;
                if (oce != null)
                {
                    if (oce.CancellationToken == ct)
                        Console.WriteLine("OK: Normal Cancellation");
                    else
                        Console.WriteLine("ERROR: Unexpected cancellation");
                }
                else
                {
                    Console.WriteLine("ERROR: " + ex.Message);
                }
            }
        }

        Console.Write("Press Enter to Exit:");
        Console.ReadLine();
    }

    static Task<string> ShouldHaveBeenAsynchronous(CancellationToken ct)
    {
        var tcs = new TaskCompletionSource<string>();

        try
        {
            //throw new NotImplementedException();

            ct.WaitHandle.WaitOne(5000);
            ct.ThrowIfCancellationRequested();
            tcs.TrySetResult("this is the result");
        }
        catch (OperationCanceledException ex)
        {
            if (ex.CancellationToken == ct)
                tcs.TrySetCanceled(ct);
            else
                tcs.TrySetException(ex);
        }
        catch (Exception ex)
        {
            tcs.TrySetException(ex);
        }

        return tcs.Task;
        //return tcs.TaskWithCancellation(ct, false);
    }
}
于 2014-08-25T02:36:13.197 回答