18

我们的应用程序使用 TPL 序列化(可能)长时间运行的工作单元。工作(任务)的创建是用户驱动的,可以随时取消。为了有一个响应式的用户界面,如果当前的工作不再需要,我们想放弃我们正在做的事情,并立即开始一个不同的任务。

任务排队是这样的:

private Task workQueue;
private void DoWorkAsync
    (Action<WorkCompletedEventArgs> callback, CancellationToken token) 
{
   if (workQueue == null)
   {
      workQueue = Task.Factory.StartWork
          (() => DoWork(callback, token), token);
   }
   else 
   {
      workQueue.ContinueWork(t => DoWork(callback, token), token);
   }
}

DoWork方法包含一个长时间运行的调用,因此它并不像在token.IsCancellationRequested检测到取消时不断检查状态和放弃那样简单。即使任务被取消,长时间运行的工作也会阻止任务继续直到它完成。

我想出了两个示例方法来解决这个问题,但我不相信任何一个都是正确的。我创建了简单的控制台应用程序来演示它们是如何工作的。

需要注意的重要一点是在原始任务完成之前继续触发

尝试#1:内部任务

static void Main(string[] args)
{
   CancellationTokenSource cts = new CancellationTokenSource();
   var token = cts.Token;
   token.Register(() => Console.WriteLine("Token cancelled"));
   // Initial work
   var t = Task.Factory.StartNew(() =>
     {
        Console.WriteLine("Doing work");

      // Wrap the long running work in a task, and then wait for it to complete
      // or the token to be cancelled.
        var innerT = Task.Factory.StartNew(() => Thread.Sleep(3000), token);
        innerT.Wait(token);
        token.ThrowIfCancellationRequested();
        Console.WriteLine("Completed.");
     }
     , token);
   // Second chunk of work which, in the real world, would be identical to the
   // first chunk of work.
   t.ContinueWith((lastTask) =>
         {
             Console.WriteLine("Continuation started");
         });

   // Give the user 3s to cancel the first batch of work
   Console.ReadKey();
   if (t.Status == TaskStatus.Running)
   {
      Console.WriteLine("Cancel requested");
      cts.Cancel();
      Console.ReadKey();
   }
}

这行得通,但“innerT”任务对我来说感觉非常笨拙。它还有一个缺点,就是迫使我重构以这种方式排队工作的代码的所有部分,因为必须将所有长时间运行的调用包装在一个新任务中。

尝试 #2:TaskCompletionSource 修补

static void Main(string[] args)
{  var tcs = new TaskCompletionSource<object>();
//Wire up the token's cancellation to trigger the TaskCompletionSource's cancellation
   CancellationTokenSource cts = new CancellationTokenSource();
   var token = cts.Token;
   token.Register(() =>
         {   Console.WriteLine("Token cancelled");
             tcs.SetCanceled();
          });
   var innerT = Task.Factory.StartNew(() =>
      {
          Console.WriteLine("Doing work");
          Thread.Sleep(3000);
          Console.WriteLine("Completed.");
    // When the work has complete, set the TaskCompletionSource so that the
    // continuation will fire.
          tcs.SetResult(null);
       });
   // Second chunk of work which, in the real world, would be identical to the
   // first chunk of work.
   // Note that we continue when the TaskCompletionSource's task finishes,
   // not the above innerT task.
   tcs.Task.ContinueWith((lastTask) =>
      {
         Console.WriteLine("Continuation started");
      });
   // Give the user 3s to cancel the first batch of work
   Console.ReadKey();
   if (innerT.Status == TaskStatus.Running)
   {
      Console.WriteLine("Cancel requested");
      cts.Cancel();
      Console.ReadKey();
   }
}

这再次有效,但现在我有两个问题:

a) 感觉就像我在滥用 TaskCompletionSource,从不使用它的结果,并且在我完成工作时设置为 null。

b) 为了正确连接延续,我需要处理前一个工作单元的唯一 TaskCompletionSource,而不是为其创建的任务。这在技术上是可行的,但又让人觉得笨重和奇怪。

然后去哪儿?

重申一下,我的问题是:这些方法中的任何一种都是解决这个问题的“正确”方法,还是有更正确/优雅的解决方案可以让我过早中止长期运行的任务并立即开始继续?我更喜欢低影响的解决方案,但如果这是正确的做法,我愿意进行一些大规模的重构。

或者,TPL 是否是该工作的正确工具,或者我是否缺少更好的任务排队机制。我的目标框架是 .NET 4.0。

4

1 回答 1

9

这里真正的问题是 DoWork 中长时间运行的调用无法感知取消。如果我理解正确,您在这里所做的并不是真正取消长期运行的工作,而只是允许继续执行,并且当取消任务的工作完成时,忽略结果。例如,如果您使用内部任务模式调用 CrunchNumbers(),这需要几分钟,取消外部任务将允许继续发生,但 CrunchNumbers() 将继续在后台执行直到完成。

除了让您的长期通话支持取消之外,我认为没有任何真正的解决方法。通常这是不可能的(他们可能会阻塞 API 调用,没有 API 支持取消。)在这种情况下,这确实是 API 中的一个缺陷;您可以检查是否有替代 API 调用可用于以可以取消的方式执行操作。一种破解方法是在任务启动时捕获对任务正在使用的底层线程的引用,然后调用 Thread.Interrupt。这会将线程从各种睡眠状态中唤醒并允许它终止,但可能会以一种令人讨厌的方式。最坏的情况下,您甚至可以调用 Thread.Abort,但这会带来更大的问题并且不推荐。


这是对基于委托的包装器的尝试。它未经测试,但我认为它可以解决问题;如果您使其工作并进行修复/改进,请随时编辑答案。

public sealed class AbandonableTask
{
    private readonly CancellationToken _token;
    private readonly Action _beginWork;
    private readonly Action _blockingWork;
    private readonly Action<Task> _afterComplete;

    private AbandonableTask(CancellationToken token, 
                            Action beginWork, 
                            Action blockingWork, 
                            Action<Task> afterComplete)
    {
        if (blockingWork == null) throw new ArgumentNullException("blockingWork");

        _token = token;
        _beginWork = beginWork;
        _blockingWork = blockingWork;
        _afterComplete = afterComplete;
    }

    private void RunTask()
    {
        if (_beginWork != null)
            _beginWork();

        var innerTask = new Task(_blockingWork, 
                                 _token, 
                                 TaskCreationOptions.LongRunning);
        innerTask.Start();

        innerTask.Wait(_token);
        if (innerTask.IsCompleted && _afterComplete != null)
        {
            _afterComplete(innerTask);
        }
    }

    public static Task Start(CancellationToken token, 
                             Action blockingWork, 
                             Action beginWork = null, 
                             Action<Task> afterComplete = null)
    {
        if (blockingWork == null) throw new ArgumentNullException("blockingWork");

        var worker = new AbandonableTask(token, beginWork, blockingWork, afterComplete);
        var outerTask = new Task(worker.RunTask, token);
        outerTask.Start();
        return outerTask;
    }
}
于 2011-01-20T16:00:46.897 回答