169

使用 ForEach 时是否可以使用异步?下面是我正在尝试的代码:

using (DataContext db = new DataLayer.DataContext())
{
    db.Groups.ToList().ForEach(i => async {
        await GetAdminsFromGroup(i.Gid);
    });
}

我收到错误消息:

当前上下文中不存在名称“异步”

包含 using 语句的方法设置为异步。

4

10 回答 10

241

List<T>.ForEach不能很好地配合async(LINQ-to-objects 也没有,出于同样的原因)。

在这种情况下,我建议将每个元素投射到异步操作中,然后您可以(异步)等待它们全部完成。

using (DataContext db = new DataLayer.DataContext())
{
    var tasks = db.Groups.ToList().Select(i => GetAdminsFromGroupAsync(i.Gid));
    var results = await Task.WhenAll(tasks);
}

async与委托委托相比,这种方法的好处ForEach是:

  1. 错误处理更合适。async void不能用catch;捕获异常 这种方法将await Task.WhenAll在线路上传播异常,允许自然的异常处理。
  2. 您知道在此方法结束时任务已完成,因为它执行await Task.WhenAll. 如果使用async void,则无法轻易判断操作何时完成。
  3. 这种方法具有用于检索结果的自然语法。GetAdminsFromGroupAsync听起来这是一个产生结果的操作(管理员),如果这样的操作可以返回它们的结果而不是设置一个值作为副作用,那么这样的代码会更自然。
于 2013-09-07T01:22:21.717 回答
74

这个小扩展方法应该为您提供异常安全的异步迭代:

public static async Task ForEachAsync<T>(this List<T> list, Func<T, Task> func)
{
    foreach (var value in list)
    {
        await func(value);
    }
}

由于我们将 lambda 的返回类型从 更改voidTask,因此异常将正确传播。这将允许您在实践中编写如下内容:

await db.Groups.ToList().ForEachAsync(async i => {
    await GetAdminsFromGroup(i.Gid);
});
于 2015-03-11T20:38:59.023 回答
30

从 开始C# 8.0,您可以异步创建和使用流。

    private async void button1_Click(object sender, EventArgs e)
    {
        IAsyncEnumerable<int> enumerable = GenerateSequence();

        await foreach (var i in enumerable)
        {
            Debug.WriteLine(i);
        }
    }

    public static async IAsyncEnumerable<int> GenerateSequence()
    {
        for (int i = 0; i < 20; i++)
        {
            await Task.Delay(100);
            yield return i;
        }
    }

更多的

于 2019-09-26T07:53:53.763 回答
27

简单的答案是使用foreach关键字而不是ForEach()方法 of List()

using (DataContext db = new DataLayer.DataContext())
{
    foreach(var i in db.Groups)
    {
        await GetAdminsFromGroup(i.Gid);
    }
}
于 2019-04-04T12:23:09.557 回答
13

这是上述异步 foreach 变体的实际工作版本,具有顺序处理:

public static async Task ForEachAsync<T>(this List<T> enumerable, Action<T> action)
{
    foreach (var item in enumerable)
        await Task.Run(() => { action(item); }).ConfigureAwait(false);
}

这是实现:

public async void SequentialAsync()
{
    var list = new List<Action>();

    Action action1 = () => {
        //do stuff 1
    };

    Action action2 = () => {
        //do stuff 2
    };

    list.Add(action1);
    list.Add(action2);

    await list.ForEachAsync();
}

关键区别是什么?.ConfigureAwait(false);它在每个任务的异步顺序处理时保持主线程的上下文。

于 2017-03-03T03:26:39.480 回答
3

问题是async关键字需要出现在 lambda 之前,而不是在正文之前:

db.Groups.ToList().ForEach(async (i) => {
    await GetAdminsFromGroup(i.Gid);
});
于 2013-09-06T22:48:38.777 回答
2

添加此扩展方法

public static class ForEachAsyncExtension
{
    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(from partition in Partitioner.Create(source).GetPartitions(dop) 
            select Task.Run(async delegate
            {
                using (partition)
                    while (partition.MoveNext())
                        await body(partition.Current).ConfigureAwait(false);
            }));
    }
}

然后像这样使用:

Task.Run(async () =>
{
    var s3 = new AmazonS3Client(Config.Instance.Aws.Credentials, Config.Instance.Aws.RegionEndpoint);
    var buckets = await s3.ListBucketsAsync();

    foreach (var s3Bucket in buckets.Buckets)
    {
        if (s3Bucket.BucketName.StartsWith("mybucket-"))
        {
            log.Information("Bucket => {BucketName}", s3Bucket.BucketName);

            ListObjectsResponse objects;
            try
            {
                objects = await s3.ListObjectsAsync(s3Bucket.BucketName);
            }
            catch
            {
                log.Error("Error getting objects. Bucket => {BucketName}", s3Bucket.BucketName);
                continue;
            }

            // ForEachAsync (4 is how many tasks you want to run in parallel)
            await objects.S3Objects.ForEachAsync(4, async s3Object =>
            {
                try
                {
                    log.Information("Bucket => {BucketName} => {Key}", s3Bucket.BucketName, s3Object.Key);
                    await s3.DeleteObjectAsync(s3Bucket.BucketName, s3Object.Key);
                }
                catch
                {
                    log.Error("Error deleting bucket {BucketName} object {Key}", s3Bucket.BucketName, s3Object.Key);
                }
            });

            try
            {
                await s3.DeleteBucketAsync(s3Bucket.BucketName);
            }
            catch
            {
                log.Error("Error deleting bucket {BucketName}", s3Bucket.BucketName);
            }
        }
    }
}).Wait();
于 2016-07-26T22:35:13.040 回答
1

如果您使用的是 EntityFramework.Core ,则有一个扩展方法 ForEachAsync

示例用法如下所示:

using Microsoft.EntityFrameworkCore;
using System.Threading.Tasks;

public class Example
{
    private readonly DbContext _dbContext;
    public Example(DbContext dbContext)
    {
        _dbContext = dbContext;
    }
    public async void LogicMethod()
    {
        
        await _dbContext.Set<dbTable>().ForEachAsync(async x =>
        {
            //logic
            await AsyncTask(x);
        });
    }

    public async Task<bool> AsyncTask(object x)
    {
        //other logic
        return await Task.FromResult<bool>(true);
    }
}
于 2021-08-24T12:26:07.877 回答
-1

我想补充一点,有一个内置 ForEach 函数的Parallel 类可用于此目的。

于 2021-01-02T14:47:50.913 回答
-2

这是我创建的用于处理异步场景的方法ForEach

  • 如果其中一项任务失败,则其他任务将继续执行。
  • 您可以添加将在每个异常上执行的函数。
  • 异常在最后被收集为 aggregateException 并且可供您使用。
  • 可以处理 CancellationToken
 public static class ParallelExecutor
    {
        /// <summary>
        /// Executes asynchronously given function on all elements of given enumerable with task count restriction.
        /// Executor will continue starting new tasks even if one of the tasks throws. If at least one of the tasks throwed exception then <see cref="AggregateException"/> is throwed at the end of the method run.
        /// </summary>
        /// <typeparam name="T">Type of elements in enumerable</typeparam>
        /// <param name="maxTaskCount">The maximum task count.</param>
        /// <param name="enumerable">The enumerable.</param>
        /// <param name="asyncFunc">asynchronous function that will be executed on every element of the enumerable. MUST be thread safe.</param>
        /// <param name="onException">Acton that will be executed on every exception that would be thrown by asyncFunc. CAN be thread unsafe.</param>
        /// <param name="cancellationToken">The cancellation token.</param>
        public static async Task ForEachAsync<T>(int maxTaskCount, IEnumerable<T> enumerable, Func<T, Task> asyncFunc, Action<Exception> onException = null, CancellationToken cancellationToken = default)
        {
            using var semaphore = new SemaphoreSlim(initialCount: maxTaskCount, maxCount: maxTaskCount);

            // This `lockObject` is used only in `catch { }` block.
            object lockObject = new object();
            var exceptions = new List<Exception>();
            var tasks = new Task[enumerable.Count()];
            int i = 0;

            try
            {
                foreach (var t in enumerable)
                {
                    await semaphore.WaitAsync(cancellationToken);
                    tasks[i++] = Task.Run(
                        async () =>
                        {
                            try
                            {
                                await asyncFunc(t);
                            }
                            catch (Exception e)
                            {
                                if (onException != null)
                                {
                                    lock (lockObject)
                                    {
                                        onException.Invoke(e);
                                    }
                                }

                                // This exception will be swallowed here but it will be collected at the end of ForEachAsync method in order to generate AggregateException.
                                throw;
                            }
                            finally
                            {
                                semaphore.Release();
                            }
                        }, cancellationToken);

                    if (cancellationToken.IsCancellationRequested)
                    {
                        break;
                    }
                }
            }
            catch (OperationCanceledException e)
            {
                exceptions.Add(e);
            }

            foreach (var t in tasks)
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    break;
                }

                // Exception handling in this case is actually pretty fast.
                // https://gist.github.com/shoter/d943500eda37c7d99461ce3dace42141
                try
                {
                    await t;
                }
#pragma warning disable CA1031 // Do not catch general exception types - we want to throw that exception later as aggregate exception. Nothing wrong here.
                catch (Exception e)
#pragma warning restore CA1031 // Do not catch general exception types
                {
                    exceptions.Add(e);
                }
            }

            if (exceptions.Any())
            {
                throw new AggregateException(exceptions);
            }
        }
    }
于 2020-12-14T21:33:38.067 回答