221

我想并行处理一个集合,但我在实现它时遇到了麻烦,因此我希望能得到一些帮助。

如果我想在并行循环的 lambda 中调用 C# 中标记为 async 的方法,就会出现问题。例如:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

计数为 0 时会出现问题,因为创建的所有线程实际上只是后台线程,并且Parallel.ForEach调用不会等待完成。如果我删除 async 关键字,该方法如下所示:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

它可以工作,但它完全禁用了等待的聪明,我必须做一些手动异常处理..(为简洁起见已删除)。

如何实现一个Parallel.ForEach循环,在 lambda 中使用 await 关键字?是否可以?

Parallel.ForEach 方法的原型采用Action<T>as 参数,但我希望它等待我的异步 lambda。

4

11 回答 11

272

如果你只想要简单的并行性,你可以这样做:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

如果您需要更复杂的东西,请查看Stephen Toub 的ForEachAsync帖子

于 2013-02-28T13:30:44.903 回答
119

您可以使用AsyncEnumerator NuGet PackageParallelForEachAsync中的扩展方法:

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;
于 2016-08-26T21:02:40.053 回答
42

新的 .NET 6 API 之一是Parallel.ForEachAsync,这是一种安排异步工作的方法,允许您控制并行度:

var urls = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://stackoverflow.com"
};

var client = new HttpClient();

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);

    var response = await client.GetAsync(url);

    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);

        await response.Content.CopyToAsync(target);
    }
});

另一个例子见SCOTT HANSELMAN 博客

于 2021-08-24T04:26:25.457 回答
31

SemaphoreSlim你可以实现并行控制。

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  await throttler.WaitAsync();
  try
  {
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;
于 2019-07-02T21:44:28.130 回答
10

从其他答案和公认的 asnwer 引用的文章中编译的最简单的可能扩展方法:

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync();
        try
        {
            await asyncAction(item).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}
于 2021-03-15T09:56:53.377 回答
5

我的 ParallelForEach 异步的轻量级实现。

特征:

  1. 节流(最大并行度)。
  2. 异常处理(完成时会抛出聚合异常)。
  3. 内存高效(无需存储任务列表)。

public static class AsyncEx
{
    public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
    {
        var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
        var tcs = new TaskCompletionSource<object>();
        var exceptions = new ConcurrentBag<Exception>();
        bool addingCompleted = false;

        foreach (T item in source)
        {
            await semaphoreSlim.WaitAsync();
            asyncAction(item).ContinueWith(t =>
            {
                semaphoreSlim.Release();

                if (t.Exception != null)
                {
                    exceptions.Add(t.Exception);
                }

                if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
                {
                    tcs.TrySetResult(null);
                }
            });
        }

        Volatile.Write(ref addingCompleted, true);
        await tcs.Task;
        if (exceptions.Count > 0)
        {
            throw new AggregateException(exceptions);
        }
    }
}

使用示例:

await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
    var data = await GetData(i);
}, maxDegreeOfParallelism: 100);
于 2018-10-17T11:01:56.637 回答
1

我为此创建了一个扩展方法,它利用 SemaphoreSlim 并且还允许设置最大并行度

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

样品用法:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
于 2018-05-09T22:39:16.617 回答
1

在接受的答案中,不需要 ConcurrentBag。这是一个没有它的实现:

var tasks = myCollection.Select(GetData).ToList();
await Task.WhenAll(tasks);
var results = tasks.Select(t => t.Result);

任何“// some pre stuff”和“// some post stuff”都可以进入GetData实现(或另一个调用GetData的方法)

除了更短之外,没有使用“async void”lambda,这是一种反模式。

于 2021-01-18T10:30:23.080 回答
1

随着.Net 6 Parallel.ForEachAsync的引入,现在可用。

using System.Net.Http.Headers;
using System.Net.Http.Json;
 
var userHandlers = new []
{
    "users/okyrylchuk",
    "users/shanselman",
    "users/jaredpar",
    "users/davidfowl"
};
 
using HttpClient client = new()
{
    BaseAddress = new Uri("https://api.github.com"),
};
client.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("DotNet", "6"));
 
ParallelOptions parallelOptions = new()
{
    MaxDegreeOfParallelism = 3
};
 
await Parallel.ForEachAsync(userHandlers, parallelOptions, async (uri, token) =>
{
    var user = await client.GetFromJsonAsync<GitHubUser>(uri, token);
 
    Console.WriteLine($"Name: {user.Name}\nBio: {user.Bio}\n");
});
 
public class GitHubUser
{
    public string Name { get; set; }
    public string  Bio { get; set; }
}

github上的完整问题跟踪以及 SCOTT HANSELMAN 的 一些使用示例

于 2021-11-11T08:28:14.270 回答
0

以下设置为IAsyncEnumerable可使用,但可以IEnumerable通过更改类型并删除foreach. 它比创建无数并行任务然后等待它们全部更适合大量数据。

    public static async Task ForEachAsyncConcurrent<T>(this IAsyncEnumerable<T> enumerable, Func<T, Task> action, int maxDegreeOfParallelism, int? boundedCapacity = null)
    {
        ActionBlock<T> block = new ActionBlock<T>(
           action, 
           new ExecutionDataflowBlockOptions 
           { 
             MaxDegreeOfParallelism = maxDegreeOfParallelism, 
             BoundedCapacity = boundedCapacity ?? maxDegreeOfParallelism * 3 
           });

        await foreach (T item in enumerable)
        {
           await block.SendAsync(item).ConfigureAwait(false);
        }

        block.Complete();
        await block.Completion;
    }
于 2021-02-09T03:16:26.520 回答
-1

对于更简单的解决方案(不确定是否最佳),您可以简单地嵌套Parallel.ForEach在 a Task- 这样

var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }
Task.Run(() =>
{
    Parallel.ForEach(myCollection, options, item =>
    {
        DoWork(item);
    }
}

开箱即用ParallelOptions,将为您进行节流。

我在现实世界的场景中使用它在后台运行很长时间的操作。这些操作是通过 HTTP 调用的,它被设计为在长操作运行时不会阻塞 HTTP 调用。

  1. 调用 HTTP 进行长时间后台操作。
  2. 操作从后台开始。
  3. 用户获取状态 ID,可用于使用另一个 HTTP 调用检查状态。
  4. 后台操作更新其状态。

这样,CI/CD 调用不会因为长时间的 HTTP 操作而超时,而是每 x 秒循环一次状态而不阻塞进程

于 2021-02-22T19:29:16.573 回答