8

我正在尝试使用C# 8提供的新工具更新我的工具集,其中一种似乎特别有用的方法是Task.WhenAll返回IAsyncEnumerable. 此方法应在任务结果可用时立即对其进行流式传输,因此命名它WhenAll没有多大意义。WhenEach听起来更合适。该方法的签名是:

public static IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks);

这种方法可以像这样使用:

var tasks = new Task<int>[]
{
    ProcessAsync(1, 300),
    ProcessAsync(2, 500),
    ProcessAsync(3, 400),
    ProcessAsync(4, 200),
    ProcessAsync(5, 100),
};

await foreach (int result in WhenEach(tasks))
{
    Console.WriteLine($"Processed: {result}");
}

static async Task<int> ProcessAsync(int result, int delay)
{
    await Task.Delay(delay);
    return result;
}

预期输出:

已处理:5
已处理:4
已处理:1
已处理:3
已处理:2

我设法Task.WhenAny在循环中使用该方法编写了一个基本实现,但是这种方法存在问题:

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(
    Task<TResult>[] tasks)
{
    var hashSet = new HashSet<Task<TResult>>(tasks);
    while (hashSet.Count > 0)
    {
        var task = await Task.WhenAny(hashSet).ConfigureAwait(false);
        yield return await task.ConfigureAwait(false);
        hashSet.Remove(task);
    }
}

问题是性能。的实现创建了Task.WhenAny所提供任务列表的防御性副本,因此在循环中重复调用它会导致 O(n²) 计算复杂度。我幼稚的实现很难处理 10,000 个任务。我的机器上的开销将近 10 秒。我希望该方法几乎与 build-in 一样高效Task.WhenAll,可以轻松处理数十万个任务。我该如何改进WhenEach方法以使其表现得体?

4

5 回答 5

6

通过使用本文中的代码您可以实现以下功能:

public static Task<Task<T>>[] Interleaved<T>(IEnumerable<Task<T>> tasks)
{
   var inputTasks = tasks.ToList();

   var buckets = new TaskCompletionSource<Task<T>>[inputTasks.Count];
   var results = new Task<Task<T>>[buckets.Length];
   for (int i = 0; i < buckets.Length; i++)
   {
       buckets[i] = new TaskCompletionSource<Task<T>>();
       results[i] = buckets[i].Task;
   }

   int nextTaskIndex = -1;
   Action<Task<T>> continuation = completed =>
   {
       var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
       bucket.TrySetResult(completed);
   };

   foreach (var inputTask in inputTasks)
       inputTask.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

   return results;
}

然后更改您WhenEach的调用Interleaved代码

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks)
{
    foreach (var bucket in Interleaved(tasks))
    {
        var t = await bucket;
        yield return await t;
    }
}

然后你可以WhenEach像往常一样打电话给你

await foreach (int result in WhenEach(tasks))
{
    Console.WriteLine($"Processed: {result}");
}

我对 10k 个任务进行了一些基本的基准测试,并在速度方面提高了 5 倍。

于 2019-10-02T02:59:31.973 回答
5

您可以将 Channel 用作异步队列。每个任务完成后都可以写入通道。通道中的项目将通过ChannelReader.ReadAllAsync作为 IAsyncEnumerable 返回。

IAsyncEnumerable<T> ToAsyncEnumerable<T>(IEnumerable<Task<T>> inputTasks)
{
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    var continuations=inputTasks.Select(t=>t.ContinueWith(x=>
                                           writer.TryWrite(x.Result)));
    _ = Task.WhenAll(continuations)
            .ContinueWith(t=>writer.Complete(t.Exception));

    return channel.Reader.ReadAllAsync();
}

当所有任务完成时writer.Complete(),调用关闭通道。

为了测试这一点,此代码生成具有递减延迟的任务。这应该以相反的顺序返回索引:

var tasks=Enumerable.Range(1,4)
                    .Select(async i=>
                    { 
                      await Task.Delay(300*(5-i));
                      return i;
                    });

await foreach(var i in Interleave(tasks))
{
     Console.WriteLine(i);

}

生产:

4
3
2
1
于 2019-10-02T09:20:18.537 回答
2

只是为了好玩,使用System.Reactiveand System.Interactive.Async

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(
    Task<TResult>[] tasks)
    => Observable.Merge(tasks.Select(t => t.ToObservable())).ToAsyncEnumerable()
于 2019-10-02T09:22:36.393 回答
1

我真的很喜欢Panagiotis 提供的解决方案,但仍然希望引发异常,就像在 JohanP 的解决方案中一样。

为了实现这一点,我们可以稍微修改一下,在任务失败时尝试关闭通道:

public IAsyncEnumerable<T> ToAsyncEnumerable<T>(IEnumerable<Task<T>> inputTasks)
{
    if (inputTasks == null)
    {
        throw new ArgumentNullException(nameof(inputTasks), "Task list must not be null.");
    }

    var channel = Channel.CreateUnbounded<T>();
    var channelWriter = channel.Writer;
    var inputTaskContinuations = inputTasks.Select(inputTask => inputTask.ContinueWith(completedInputTask =>
    {
        // Check whether the task succeeded or not
        if (completedInputTask.Status == TaskStatus.RanToCompletion)
        {
            // Write the result to the channel on successful completion
            channelWriter.TryWrite(completedInputTask.Result);
        }
        else
        {
            // Complete the channel on failure to immediately communicate the failure to the caller and prevent additional results from being returned
            var taskException = completedInputTask.Exception?.InnerException ?? completedInputTask?.Exception;
            channelWriter.TryComplete(taskException);
        }
    }));

    // Ensure the writer is closed after the tasks are all complete, and propagate any exceptions from the continuations
    _ = Task.WhenAll(inputTaskContinuations).ContinueWith(completedInputTaskContinuationsTask => channelWriter.TryComplete(completedInputTaskContinuationsTask.Exception));

    // Return the async enumerator of the channel so results are yielded to the caller as they're available
    return channel.Reader.ReadAllAsync();
}

这样做的明显缺点是遇到的第一个错误将结束枚举并阻止返回任何其他可能成功的结果。这是我的用例可以接受的权衡,但可能不适用于其他用例。

于 2020-06-04T21:05:43.417 回答
0

我要为这个问题再添加一个答案,因为有几个问题需要解决。

  1. 建议创建异步可枚举序列的方法应该有一个CancellationToken参数。这将启用循环WithCancellation配置。await foreach
  2. 建议当异步操作将延续附加到任务时,应在操作完成时清理这些延续。因此,例如,如果WhenEach方法的调用者决定提前退出await foreach循环(使用breakreturn),或者如果循环由于异常而提前终止,我们不想留下一堆死的延续,附加到任务。WhenEach如果在循环中重复调用 (例如,作为功能的一部分),这一点尤其重要Retry

下面的实现解决了这两个问题。它基于一个Channel<Task<TResult>>. 现在渠道已经成为 .NET 平台不可分割的一部分,因此没有理由避免使用基于更复杂TaskCompletionSource的解决方案。

public async static IAsyncEnumerable<TResult> WhenEach<TResult>(
    Task<TResult>[] tasks,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    if (tasks == null) throw new ArgumentNullException(nameof(tasks));
    var channel = Channel.CreateUnbounded<Task<TResult>>();
    using var completionCts = new CancellationTokenSource();
    var continuations = new List<Task>(tasks.Length);
    try
    {
        int pendingCount = tasks.Length;
        foreach (var task in tasks)
        {
            if (task == null) throw new ArgumentException(
                $"The tasks argument included a null value.", nameof(tasks));
            continuations.Add(task.ContinueWith(t =>
            {
                bool accepted = channel.Writer.TryWrite(t);
                Debug.Assert(accepted);
                if (Interlocked.Decrement(ref pendingCount) == 0)
                    channel.Writer.Complete();
            }, completionCts.Token, TaskContinuationOptions.ExecuteSynchronously |
                TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default));
        }

        await foreach (var task in channel.Reader.ReadAllAsync(cancellationToken)
            .ConfigureAwait(false))
        {
            yield return await task.ConfigureAwait(false);
            cancellationToken.ThrowIfCancellationRequested();
        }
    }
    finally
    {
        completionCts.Cancel();
        try { await Task.WhenAll(continuations).ConfigureAwait(false); }
        catch (OperationCanceledException) { } // Ignore
    }
}

finally块负责取消附加的延续,并在退出之前等待它们完成。

循环内部似乎是多余的,但实际上是必需的,因为该ThrowIfCancellationRequested方法的设计行为,这在此处进行了解释。await foreachReadAllAsync


注意: finally 块中的被低效的/块OperationCanceledException抑制。捕获异常是昂贵的。更有效的实现将通过使用专门的等待者(如答案中的特色)等待继续,并特殊处理案例来抑制错误。出于此答案的目的,修复这种低效率可能是矫枉过正。该方法不太可能在紧密循环中使用。trycatchSuppressExceptionIsCanceledWhenEach

于 2021-11-18T20:52:09.597 回答