我有两个 .net Task 对象,我可能希望它们并行或按顺序运行。无论哪种情况,我都不想阻止线程等待它们。事实证明,响应式扩展使平行故事变得非常漂亮。但是当我尝试按顺序排列任务时,代码可以工作,但感觉很尴尬。





    public Task<string> DoWorkInParallel()
        var result = new TaskCompletionSource<string>();

        Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
        Task<bool> BravoTask = Task.Factory.StartNew(() => true);

        //Prepare for Rx, and set filters to allow 'Zip' to terminate early
        //in some cases.
        IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);
        IObservable<bool> AsyncBravo = BravoTask.ToObservable().TakeWhile(y => y);

                (x, y) => y.ToString() + x.ToString())
                (x) => { result.TrySetResult(x); },
                (x) => { result.TrySetException(x.GetBaseException()); },
                () => { result.TrySetResult("Nothing"); });

        return result.Task;



    public Task<string> DoWorkInSequence()
        var result = new TaskCompletionSource<string>();

        Task<int> AlphaTask = Task.Factory.StartNew(() => 4);

        AlphaTask.ContinueWith(x =>
            if (x.IsFaulted)
                if (x.Result != 5)
                    Task<bool> BravoTask = Task.Factory.StartNew(() => true);
                    BravoTask.ContinueWith(y =>
                        if (y.IsFaulted)
                            if (y.Result)
                                result.TrySetResult(x.Result.ToString() + y.Result.ToString());

        return result.Task;


要求(8/6 更新)


  1. 顺序方案应该允许第一个任务的输出馈送第二个任务的输入的安排。我上面的示例“尴尬”代码很容易被安排来实现这一点。

  2. 我对 .net 4.5 答案感兴趣 - 但 .net 4.0 答案对我来说同样重要或更重要。

  3. 任务“Alpha”和“Bravo”的总时限为 200 毫秒;他们每个人都没有 200 毫秒。在顺序情况下也是如此。

  4. 如果任一任务返回无效结果,SourceCompletionTask 必须在两个任务完成之前提前完成。如示例代码中的显式测试所示,无效结果是 [AlphaTask:5] 或 [BravoTask:false]。
    8/8 更新:澄清- 在顺序情况下,如果 AlphaTask 不成功或超时已经发生,则 BravoTask 根本不应该执行。

  5. 假设 AlphaTask 和 BravoTask 都不能阻塞。这并不重要,但在我的实际场景中,它们实际上是异步 WCF 服务调用。

也许我可以利用 Rx 的某个方面来清理顺序版本。但即使只是任务编程本身也应该有一个更好的故事,我想。走着瞧。

勘误表在两个代码示例中,我将返回类型更改为 Task,因为海报的答案非常正确,我不应该返回 TaskCompletionSource。


如果你可以使用 async/await,Brandon 有一个很好的答案。如果您仍在使用 VS2010,那么清理顺序版本的第一件事是获取一个扩展方法,如Then文中描述的方法 Stephen Toub 。如果您不使用 .NET 4.5,我还将实现一种方法。有了这些,您可以获得:Task.FromResult

public Task<string> DoWorkInSequence()
    return Task.FromResult(4)
           .Then(x => 
                 { if (x != 5)
                       return Task.FromResult(true)
                              .Then(y => 
                                    { if (y)
                                          return Task.FromResult(x.ToString() + y.ToString());
                                          return Task.FromResult("Nothing");
                        return Task.FromResult("Nothing");

此外,您通常应该返回 Task 而不是 TaskCompletionSource(您可以通过调用.TaskTaskCompletionSource 来获得),因为您不希望调用者为您返回给他们的任务设置结果。

Brandon 的回答还提供了一种实现超时功能的好方法(针对缺少 async/await 关键字进行调整)。

编辑 为了减少箭头代码,我们可以实现更多的 LINQ 方法。先前链接的博客文章中提供了 SelectMany 实现。LINQ 需要的其他方法是 Select 和 Where。一旦你完成了 Then 和 SelectMany,这些应该是相当简单的,但在这里它们是:

public static Task<T> Where<T>(this Task<T> task, Func<T, bool> predicate)
    if (task == null) throw new ArgumentNullException("task");
    if (predicate == null) throw new ArgumentNullException("predicate");

    var tcs = new TaskCompletionSource<T>();
    task.ContinueWith((completed) =>
            if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
            else if (completed.IsCanceled) tcs.TrySetCanceled();
                    if (predicate(completed.Result))
                catch (Exception ex)
    return tcs.Task;

public static Task<TResult> Select<T, TResult>(this Task<T> task, Func<T, TResult> selector)
    if (task == null) throw new ArgumentNullException("task");
    if (selector == null) throw new ArgumentNullException("selector");

    var tcs = new TaskCompletionSource<TResult>();
    task.ContinueWith((completed) =>
        if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
        else if (completed.IsCanceled) tcs.TrySetCanceled();
            catch (Exception ex)
    return tcs.Task;

之后,一种最终的非 LINQ 扩展方法允许在取消时返回默认值:

public static Task<T> IfCanceled<T>(this Task<T> task, T defaultValue)
    if (task == null) throw new ArgumentNullException("task");

    var tcs = new TaskCompletionSource<T>();
    task.ContinueWith((completed) =>
        if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
        else if (completed.IsCanceled) tcs.TrySetResult(defaultValue);
        else tcs.TrySetResult(completed.Result);
    return tcs.Task;

以及新的和改进的 DoWork(无超时):

public static Task<string> DoWorkInSequence()
    return (from x in Task_FromResult(5)
            where x != 5
            from y in Task_FromResult(true)
            where y
            select x.ToString() + y.ToString()

Brandon 的回答中的 Timeout 方法(一旦重写,如果需要没有异步/等待)可以卡在链的末尾以实现整体超时和/或在链中的每个步骤之后,如果您想阻止进一步的步骤运行一次已达到总体超时。链中断的另一种可能性是使所有单个步骤都采用取消令牌并修改 Timeout 方法以采用 CancellationTokenSource 并在发生超时时取消它,以及抛出超时异常。


从您提出的内容中汲取奇妙的想法,我设计了我认为是我的 POV 的最终答案。它基于ParallelExtensionsExtras的 nuget 包中的 .net 4.0 扩展方法。下面的示例添加了第三个任务,以帮助说明针对顺序任务进行编程的“感觉”,考虑到我提出的要求:

public Task<string> DoWorkInSequence()
    var cts = new CancellationTokenSource();

    Task timer = Task.Factory.StartNewDelayed(200, () => { cts.Cancel(); });

    Task<int> AlphaTask = Task.Factory
        .StartNew(() => 4 )
        .Where(x => x != 5 && !cts.IsCancellationRequested);

    Task<bool> BravoTask = AlphaTask
        .Then(x => true)
        .Where(x => x && !cts.IsCancellationRequested);

    Task<int> DeltaTask = BravoTask
        .Then(x => 7)
        .Where(x => x != 8);

    Task<string> final = Task.Factory
        .WhenAny(DeltaTask, timer)
        .ContinueWith(x => !DeltaTask.IsCanceled && DeltaTask.Status == TaskStatus.RanToCompletion
            ? AlphaTask.Result.ToString() + BravoTask.Result.ToString() + DeltaTask.Result.ToString(): "Nothing");

    //This is here just for experimentation.  Placing it at different points
    //above will have varying effects on what tasks were cancelled at a given point in time.

    return final;


  • 在琐碎的情况下使用“Then”扩展名很好,但值得注意的是,它的适用性有限。对于更复杂的情况,有必要将其替换为,例如.ContinueWith(x => true, cts.Token, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default). 在我陈述的场景中将“Then”替换为“ContinueWith”时,添加该OnlyOnRanToCompletion选项至关重要。
  • 使用超时扩展最终在我的场景中不起作用。这是因为它只会导致它立即附加到的 Task 的取消,而不是取消序列中所有先前的 Task 实例。这就是为什么我改用这种策略并在每个子句StartNewDelayed(...)中添加快速取消检查的原因。Where
  • 尽管 ParallelExtensionsExtras 库定义了您使用的LINQ to Tasks,但我得出的结论是最好远离 LINQ-ish 的 Tasks 外观。这是因为使用 LINQ 的任务非常深奥;它可能会让普通开发人员感到困惑。让他们理解异步编码已经够难的了。甚至 LINQ to Tasks 的作者也说“这种 LINQ 实现在实践中的有用性是有争议的,但至少它提供了一个有趣的思考练习。” 是的,同意,一个有趣的思考练习。当然,我必须至少承认“Where”LINQ to Tasks 方法,因为它在我上面列出的解决方案中发挥了关键作用。
首先,我没有返回一个TaskCompletionSource. 这是达到目的的一种手段……您的方法的实现细节应该对公共 API 隐藏。您的方法应该返回 a Task(它应该只是 return result.Task)。

无论如何,如果你只是在处理任务,你应该只使用 TPL 而不是使用 Rx。仅当您确实需要将任务与其他 rx 代码集成时才使用 Rx。DoWorkInParallel如果你不混入 Rx 的东西,你甚至可以变得更简单。Rx 可以出色地处理复杂的任务内容。但是您描述的场景相对简单,可以通过 TPL 简单地解决。

以下是如何在 TPL 中执行并行和顺序版本:

/// <summary>Extension methods for timing out tasks</summary>
public static class TaskExtensions
    /// <summary> throws an error if task does not complete before the timer.</summary>
    public static async Task Timeout(this Task t, Task timer)
        var any = await Task.WhenAny(t, timer);
        if (any != t)
           throw new TimeoutException("task timed out");

    /// <summary> throws an error if task does not complete before the timer.</summary>
    public static async Task<T> Timeout<T>(this Task<T> t, Task timer)
        await Timeout((Task)t, timer);
        return t.Result;

    /// <summary> throws an error if task does not complete in time.</summary>
    public static Task Timeout(this Task t, TimeSpan delay)
        return t.IsCompleted ? t : Timeout(t, Task.Delay(delay));

    /// <summary> throws an error if task does not complete in time.</summary>
    public static Task<T> Timeout<T>(this Task<T> t, TimeSpan delay)
        return Timeout((Task)t, delay);

// .. elsewhere ..
public async Task<string> DoWorkInParallel()
    var timer = Task.Delay(TimeSpan.FromMilliseconds(200));
    var alphaTask = Task.Run(() => 4);
    var betaTask = Task.Run(() => true);

    // wait for one of the tasks to complete
    var t = await Task.WhenAny(alphaTask, betaTask).Timeout(timer);

    // exit early if the task produced an invalid result
    if ((t == alphaTask && alphaTask.Result != 5) ||
        (t == betaTask && !betaTask.Result)) return "Nothing";

    // wait for the other task to complete
    // could also just write: await Task.WhenAll(alphaTask, betaTask).Timeout(timer);
    await ((t == alphaTask) ? (Task)betaTask : (Task)alphaTask).Timeout(timer);

    // unfortunately need to repeat the validation logic here.
    // this logic could be moved to a helper method that is just called in both places.
    var alpha = alphaTask.Result;
    var beta = betaTask.Result;
    return (alpha != 5 && beta) ? (alpha.ToString() + beta.ToString()) : "Nothing";

public async Task<string> DoWorkInSequence()
    var timer = Task.Delay(TimeSpan.FromMilliseconds(200));
    var alpha = await Task.Run(() => 4).Timeout(timer);
    if (alpha != 5)
        var beta = await Task.Run(() => true).Timeout(timer);
        if (beta)
            return alpha.ToString() + beta.ToString();

    return "Nothing";

如果您需要在 .Net 4.0 中工作,那么您可以使用 Microsoft.Bcl.Async nuget 包,它允许您使用 VS2012 编译器来定位 .Net 4.0并仍然使用 async/await。请参阅这个 SO 问题:在 .net 4 上使用 async-await

编辑:如果任务产生无效值,我已经修改了代码以提前退出并行和顺序版本,并且我已经修改了超时以组合而不是每个任务。尽管在顺序情况下,此计时器也将计算2 个任务之间的时间。

Aron 几乎看准了它

public Task<string> DoWorkSequentially()
   Task<int> AlphaTask = Task.Run(() => 4);    //Some work;
   Task<bool> BravoTask = Task.Run(() => true);//Some other work;

   //Prepare for Rx, and set filters to allow 'Zip' to terminate early
   //in some cases.
   IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);
   IObservable<bool> AsyncBravo = BravoTask.ToObservable().TakeWhile(y => y);

    return (from alpha in AsyncAlpha
           from bravo in AsyncBravo
           select bravo.ToString() + alpha.ToString())
       .Concat(Observable.Return("Nothing"))   //Return Nothing if no result

这里我只是把BravoFunc后面的一个BravoTask. 我已经删除了TaskCompletionSource(就像 Aron 所做的那样)。最后,您使用ToTask()运算符将​​ Rx 延续转回 a Task<string>


    from alpha in AsyncAlpha
    from bravo in AsyncBravo
    select bravo.ToString() + alpha.ToString()


    AsyncAlpha.SelectMany(a=>AsyncBravo.Select(b=> b.ToString() + a.ToString()))

SelectMany 运算符对于这些类型的延续非常方便。在查询理解语法中更加方便,因为您仍然可以访问最终选择子句中的bravoand alpha

如您所见,一旦您有许多延续,这将变得非常有用。例如,考虑一个需要 3 或 4 个延续的示例

    from a in Alpha
    from b in Bravo
    from c in Charlie
    from d in Delta
    select a+b+c+d


    from isConnected in _server.ConnectionState.Where(c=>c)
    from session in _server.GetSession()
    from customer in _customerServiceClient.GetCustomers(session)
    select customer;

或者可能在我们需要进行身份验证的社交媒体提要中,找到联系人,获取他们的电子邮件列表,然后拉下这些电子邮件的前 20 个标题。

    from accessToken in _oauth.Authenticate()
    from contact in _contactServiceClient.GetContact(emailAddress, accessToken)
    from imapMessageId in _mailServiceClient.Search(contact).Take(20)
    from email in _mailServiceClient.GetEmailHeaders(imapMessageId)
    select email;
public Task<string> DoWorkInSequence()
    Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
    Func<int> BravoFunc = x => 2 * x;

    //Prepare for Rx, and set filters to allow 'Zip' to terminate early
    //in some cases.
    IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);

    return AsyncAlpha
        .Do(x => Console.WriteLine(x))  //This is how you "Do WORK in sequence"
        .Select(BravoFunc)              //This is how you map results from Alpha
                                        //via a second method.
            (x) => { result.TrySetResult(x); },
            (x) => { result.TrySetException(x.GetBaseException()); },
            () => { result.TrySetResult("Nothing"); }).ToTask();

但最终,如果你想要任务,我实际上只是在 TPL 中完成所有这些,或者使用Observable.ToTask(this IObservable<T> observable)而不是使用TaskCompletionSource

于 2013-08-06T03:09:51.740 回答