1
public async IAsyncEnumerable<Entity> FindByIds(List<string> ids)
    {
        List<List<string>> splitIdsList = ids.Split(5);

        var entityList = splitIdsList.Select(x => FindByIdsQuery(x)).ToList();

        foreach (var entities in entityList)
        {
            await foreach (var entity in entities)
            {
                yield return entity;
            }
        }
    }

private async IAsyncEnumerable<Entity> FindByIdsQuery(List<string> ids)
    {
        var result = await Connection.QueryAsync(query, new {ids})

        foreach (var entity in result)
        {
            yield return entity;
        }
    }

如果我向这个函数发送 25 个 ID。第一个 FindByIdsQuery 需要 5000 毫秒。其他 4 个 FindByIdsQuery 需要 100 毫秒。然后这个解决方案不会输出任何实体,直到 5000 毫秒之后。是否有任何解决方案可以在有任何人输出时立即开始输出实体。或者,如果您可以在 Task 中执行类似操作,则使用Task.WhenAny.

需要明确的是:5 个查询中的任何一个都可能需要 5000 毫秒。

4

2 回答 2

5

从您的评论中,我理解了您的问题。您基本上要寻找的是某种“ SelectMany”运算符。该运算符将开始等待所有的IAsyncEnumerables并按它们来的顺序返回项目,而不管源异步枚举的顺序是什么。

我希望,默认值AsyncEnumerable.SelectMany会这样做,但我发现这不是真的。它遍历源枚举,然后遍历整个内部枚举,然后继续下一个。所以我破解了同时SelectMany正确等待所有内部异步枚举的变体。请注意,我不保证正确性,也不保证安全。零错误处理。

/// <summary>
/// Starts all inner IAsyncEnumerable and returns items from all of them in order in which they come.
/// </summary>
public static async IAsyncEnumerable<TItem> SelectManyAsync<TItem>(IEnumerable<IAsyncEnumerable<TItem>> source)
{
    // get enumerators from all inner IAsyncEnumerable
    var enumerators = source.Select(x => x.GetAsyncEnumerator()).ToList();

    List<Task<(IAsyncEnumerator<TItem>, bool)>> runningTasks = new List<Task<(IAsyncEnumerator<TItem>, bool)>>();

    // start all inner IAsyncEnumerable
    foreach (var asyncEnumerator in enumerators)
    {
        runningTasks.Add(MoveNextWrapped(asyncEnumerator));
    }

    // while there are any running tasks
    while (runningTasks.Any())
    {
        // get next finished task and remove it from list
        var finishedTask = await Task.WhenAny(runningTasks);
        runningTasks.Remove(finishedTask);

        // get result from finished IAsyncEnumerable
        var result = await finishedTask;
        var asyncEnumerator = result.Item1;
        var hasItem = result.Item2;

        // if IAsyncEnumerable has item, return it and put it back as running for next item
        if (hasItem)
        {
            yield return asyncEnumerator.Current;

            runningTasks.Add(MoveNextWrapped(asyncEnumerator));
        }
    }

    // don't forget to dispose, should be in finally
    foreach (var asyncEnumerator in enumerators)
    {
        await asyncEnumerator.DisposeAsync();
    }
}

/// <summary>
/// Helper method that returns Task with tuple of IAsyncEnumerable and it's result of MoveNextAsync.
/// </summary>
private static async Task<(IAsyncEnumerator<TItem>, bool)> MoveNextWrapped<TItem>(IAsyncEnumerator<TItem> asyncEnumerator)
{
    var res = await asyncEnumerator.MoveNextAsync();
    return (asyncEnumerator, res);
}

然后,您可以使用它来合并所有可枚举而不是第一个 foreach:

    var entities = SelectManyAsync(splitIdsList.Select(x => FindByIdsQuery(x)));

    return entities;
于 2020-02-17T09:22:42.510 回答
1

问题是您的代码让他们等待。这里的异步 foreach 没有意义,因为 - 你不做异步。

你来做这件事:

var entityList = splitIdsList.Select(x => FindByIdsQuery(x)).ToList();

这是可以异步运行的查询部分,但不是因为您将整个结果集具体化为一个列表。然后你继续异步循环它,但此时所有结果都已经在内存中。

获得异步的方法很简单,就是去掉 ToList。将查询转储到 foreach 中,不要将其具体化到内存中。async foreach 应该命中 ef 级别的查询(而不是查询结果),这样您就可以在从数据库中获取信息时对其进行处理。ToList 有效地绕过了这一点。

还要了解 EF 无法有效地处理多个 id 查找。唯一可能的方法是将它们放入一个数组并包含,这是一个 SQL“IN”子句——对于较大的数字来说效率非常低,因为它强制进行表扫描。有效的 SQL 方法是将它们加载到具有统计信息的表值变量中并使用连接,但在 EF 中无法做到这一点 - 这是限制之一。长 IN 子句的 SQL 限制有据可查。EF方面的限制没有,但它们仍然存在。

于 2020-02-17T08:41:53.247 回答