27

我有一个这样的异步谓词方法:

private async Task<bool> MeetsCriteria(Uri address)
{
    //Do something involving awaiting an HTTP request.
}

假设我有一个Uris 集合:

var addresses = new[]
{
    new Uri("http://www.google.com/"),
    new Uri("http://www.stackoverflow.com/") //etc.
};

我想addresses使用MeetsCriteria. 我想异步执行此操作;我希望对谓词的多次调用异步运行,然后我想等待它们全部完成并生成过滤后的结果集。不幸的是,LINQ 似乎不支持异步谓词,所以这样的事情不起作用

var filteredAddresses = addresses.Where(MeetsCriteria);

有没有类似方便的方法来做到这一点?

4

5 回答 5

21

我认为框架中没有这样的原因之一是存在很多可能的变化,并且在某些情况下每个选择都是正确的:

  • 谓词应该并行执行还是串行执行?
    • 如果它们并行执行,是应该全部同时执行,还是应该限制并行度?
    • 如果它们并行执行,结果应该与原始集合的顺序相同、完成顺序还是未定义顺序?
      • 如果它们应该按完成的顺序返回,是否应该有某种方法(异步)在它们完成时获取结果?(这需要将返回类型从更改为Task<IEnumerable<T>>其他类型。)

您说您希望谓词并行执行。在这种情况下,最简单的选择是一次执行它们并按完成顺序返回它们:

static async Task<IEnumerable<T>> Where<T>(
    this IEnumerable<T> source, Func<T, Task<bool>> predicate)
{
    var results = new ConcurrentQueue<T>();
    var tasks = source.Select(
        async x =>
        {
            if (await predicate(x))
                results.Enqueue(x);
        });
    await Task.WhenAll(tasks);
    return results;
}

然后你可以像这样使用它:

var filteredAddresses = await addresses.Where(MeetsCriteria);
于 2013-02-15T12:56:52.597 回答
9

第一种方法:一个接一个地发出所有请求,然后等待所有请求返回,然后过滤结果。(svick 的代码也这样做了,但这里我是在没有中间 ConcurrentQueue 的情况下这样做的)。

// First approach: massive fan-out
var tasks = addresses.Select(async a => new { A = a, C = await MeetsCriteriaAsync(a) });
var addressesAndCriteria = await Task.WhenAll(tasks);
var filteredAddresses = addressAndCriteria.Where(ac => ac.C).Select(ac => ac.A);

第二种方法:一个接一个地执行请求。这将需要更长的时间,但它会确保不会用大量的请求来打击 web 服务(假设 MeetsCriteriaAsync 会发送到 web 服务......)

// Second approach: one by one
var filteredAddresses = new List<Uri>();
foreach (var a in filteredAddresses)
{
  if (await MeetsCriteriaAsync(a)) filteredAddresses.Add(a);
}

第三种方法:至于第二种方法,但使用假设的 C#8 功能“异步流”。C#8 尚未推出,异步流尚未设计,但我们可以梦想!RX 中已经存在 IAsyncEnumerable 类型,希望他们会为它添加更多组合子。IAsyncEnumerable 的好处是我们可以在前几个filteredAddresses 到来后立即开始使用它们,而不是等待所有内容都被过滤掉。

// Third approach: ???
IEnumerable<Uri> addresses = {...};
IAsyncEnumerable<Uri> filteredAddresses = addresses.WhereAsync(MeetsCriteriaAsync);

第四种方法:也许我们不想一次用所有请求敲击 Web 服务,但我们很乐意一次发出多个请求。也许我们做了实验,发现“一次三个”是一种快乐的媒介。注意:此代码假定在 UI 编程或 ASP.NET 等单线程执行上下文中。如果它在多线程执行上下文中运行,那么它需要一个 ConcurrentQueue 和 ConcurrentList 来代替。

// Fourth approach: throttle to three-at-a-time requests
var addresses = new Queue<Uri>(...);
var filteredAddresses = new List<Uri>();
var worker1 = FilterAsync(addresses, filteredAddresses);
var worker2 = FilterAsync(addresses, filteredAddresses);
var worker3 = FilterAsync(addresses, filteredAddresses);
await Task.WhenAll(worker1, worker2, worker3);

async Task FilterAsync(Queue<Uri> q, List<Uri> r)
{
  while (q.Count > 0)
  {
    var item = q.Dequeue();
    if (await MeetsCriteriaAsync(item)) r.Add(item);
  }
}

也有使用 TPL 数据流库的第四种方法的方法。

于 2016-11-16T17:36:46.070 回答
8

我认为这比不使用任何并发队列的公认答案更简单。

public static async Task<IEnumerable<T>> Where<T>(this IEnumerable<T> source, Func<T, Task<bool>> predicate)
{
    var results = await Task.WhenAll(source.Select(async x => (x, await predicate(x))));
    return results.Where(x => x.Item2).Select(x => x.Item1);
}
于 2018-05-09T01:46:02.993 回答
6

考虑到框架的较新版本和IAsyncEnumerable<T>界面的采用,我不再在这里建议任何其他高度自定义的答案,因为它们在很大程度上是不必要的。

LINQ的System.Linq.Async异步版本可通过NuGet 包获得。

这是进行异步检查的语法:

var filteredAddresses = addresses
    .ToAsyncEnumerable()
    .WhereAwait(async x => await MeetsCriteria(x));

filteredAddresses将是 type IAsyncEnumerable<int>,可以是:

  • ToListAsync,FirstAsync等具体化
  • 迭代await foreach

要获得与以前相同的效果并允许使用方法组进行调用,您可以将返回类型更改MeetsCriteriaValueTask

private async ValueTask<bool> MeetsCriteria(Uri address)
{
    //Do something involving awaiting an HTTP request.
}

...

var filteredAddresses = addresses
    .ToAsyncEnumerable()
    .WhereAwait(MeetsCriteria);

我不建议ValueTask只使用来保存几个字符,因为它应该进行基准测试并用于性能/内存原因。

于 2021-06-30T23:17:06.483 回答
5

我会使用下面的方法而不是使用ConcurrentBagorConcurrentQueue

public static async IAsyncEnumerable<T> WhereAsync<T>(this IEnumerable<T> source, Func<T, Task<bool>> predicate)
{
    foreach(var item in source)
    {
        if(await (predicate(item)))
        {
            yield return item;
        }
    }
}

例如

    var result =  numbers.WhereAsync(async x =>
                                               await IsEvenAsync(x));
    await foreach (var x in result)
    {
        Console.Write($"{x},");
    }
于 2020-10-31T11:10:04.753 回答