4

我有一个List<Task<bool>>我想并行枚举的任务,找到要完成的第一个任务,结果是true不等待或观察任何其他仍待处理的任务的异常。

var tasks = new List<Task<bool>>
{ 
    Task.Delay(2000).ContinueWith(x => false), 
    Task.Delay(0).ContinueWith(x => true), 
};

我曾尝试使用 PLINQ 执行以下操作:

var task = tasks.AsParallel().FirstOrDefault(t => t.Result);

它并行执行,但一旦找到令人满意的结果就不会返回。因为访问 Result 属性是阻塞的。为了让它使用 PLINQ 工作,我必须写下这个令人敬畏的声明:

var cts = new CancellationTokenSource();
var task = tasks.AsParallel()
    .FirstOrDefault(t =>
    {
        try 
        { 
            t.Wait(cts.Token);
            if (t.Result)
            {
                cts.Cancel();
            }

            return t.Result;
        } 
        catch (OperationCanceledException) 
        { 
            return false;
        }
    } );

我编写了一个扩展方法,可以在完成任务时生成任务。

public static class Exts
{
    public static IEnumerable<Task<T>> InCompletionOrder<T>(this IEnumerable<Task<T>> source)
    {
        var tasks = source.ToList();
        while (tasks.Any())
        {
            var t = Task.WhenAny(tasks);
            yield return t.Result;
            tasks.Remove(t.Result);
        }
    }
}

// and run like so
var task = tasks.InCompletionOrder().FirstOrDefault(t => t.Result);

但感觉这很常见,有更好的方法。建议?

4

3 回答 3

3

也许是这样的?

var tcs = new TaskCompletionSource<Task<bool>>();

foreach (var task in tasks)
{
    task.ContinueWith((t, state) =>
    {
        if (t.Result)
        {
            ((TaskCompletionSource<Task<bool>>)state).TrySetResult(t);
        }
    },
        tcs,
        TaskContinuationOptions.OnlyOnRanToCompletion |
        TaskContinuationOptions.ExecuteSynchronously);
}

var firstTaskToComplete = tcs.Task;
于 2013-02-06T01:54:16.297 回答
1

也许你可以试试 Rx.Net 库。它实际上非常适合提供 Linq to Work。

引用 Microsoft Rx.Net 程序集后,在 LinqPad 中尝试此代码段。

using System
using System.Linq
using System.Reactive.Concurrency
using System.Reactive.Linq
using System.Reactive.Threading.Tasks
using System.Threading.Tasks

void Main()
{
    var tasks = new List<Task<bool>>
    { 
        Task.Delay(2000).ContinueWith(x => false), 
        Task.Delay(0).ContinueWith(x => true), 
    };

    var observable = (from t in tasks.ToObservable()
                      //Convert task to an observable
                      let o = t.ToObservable()
                      //SelectMany
                      from x in o
                      select x);


    var foo = observable
                .SubscribeOn(Scheduler.Default) //Run the tasks on the threadpool
                .ToList()
                .First();

    Console.WriteLine(foo);
}
于 2013-02-06T02:07:07.697 回答
1

首先,我不明白你为什么要在这里使用 PLINQ。枚举一个Tasks 列表应该不会花很长时间,所以我认为你不会从并行化它中获得任何好处。

现在,要获得Task已经完成的第一个true,您可以使用(非阻塞)IsCompleted属性

var task = tasks.FirstOrDefault(t => t.IsCompleted && t.Result);

如果您想获得Task按完成排序的 s 集合,请查看 Stephen Toub 的文章Processing tasks as they complete。如果要列出true首先返回的那些,则需要修改该代码。如果您不想修改它,可以使用Stephen Cleary 的 AsyncEx 库中的此方法的一个版本


此外,在您问题的特定情况下,您可以通过添加.WithMergeOptions(ParallelMergeOptions.NotBuffered)到 PLINQ 查询来“修复”您的代码。但是这样做在大多数情况下仍然不起作用,即使这样做也会浪费很多线程。这是因为 PLINQ 使用恒定数量的线程,而分区和使用Result会在大多数情况下阻塞这些线程。

于 2013-02-06T09:14:09.623 回答