4

我有两种方法连接到两个不同的Foos 源,它们返回两个 s IAsyncEnumerable<Foo>。我需要Foo从两个来源获取所有 s 才能处理它们。

问题:我想同时(异步)查询两个源,即。不等Source1枚举完成才开始枚举Source2。据我了解,这就是SequentialSourcesQuery下面的方法示例中发生的情况,对吗?

对于常规任务,我会启动第一个任务,然后是第二个任务,然后调用await Task.WhenAll. 但是我对如何处理有点困惑IAsyncEnumerable

public class FoosAsync
{
    public async IAsyncEnumerable<Foo> Source1() { }

    public async IAsyncEnumerable<Foo> Source2() { }

    public async Task<List<Foo>> SequentialSourcesQuery()
    {
        List<Foo> foos = new List<Foo>();

        await foreach (Foo foo1 in Source1())
        {
            foos.Add(foo1);
        }

        await foreach (Foo foo2 in Source2())
        { //doesn't start until Source1 completed the enumeration? 
            foos.Add(foo2);
        }

        return foos;
    }
}
4

3 回答 3

5

您可以利用System.Linq.AsyncSystem.Interactive.Async库(由属于 .NET 基金会的RxTeam拥有)。它们包含类似Mergeand的运算符ToListAsync,可以轻松解决您的问题。

// Merges elements from all of the specified async-enumerable sequences
// into a single async-enumerable sequence.
public static IAsyncEnumerable<TSource> Merge<TSource>(
    params IAsyncEnumerable<TSource>[] sources);

// Creates a list from an async-enumerable sequence.
public static ValueTask<List<TSource>> ToListAsync<TSource>(
    this IAsyncEnumerable<TSource> source,
    CancellationToken cancellationToken = default);

把所有东西放在一起:

public Task<List<Foo>> SequentialSourcesQuery()
{
    return AsyncEnumerableEx.Merge(Source1(), Source2()).ToListAsync().AsTask();
}

通过意识到这些库专注于提供丰富的功能集,而不是性能或效率。因此,如果一流的性能对您的用例很重要,niki.kante 的解决方案很可能会胜过上述基于运算符的方法。

于 2021-02-11T18:10:55.213 回答
2

如果您有两个IAsyncEnumerable<T>作为源并且不关心传入数据的顺序,则可以使用如下方法来交错数据。

public static class AsyncEnumerableExt
{
    public static async IAsyncEnumerable<T> Interleave<T>(this IAsyncEnumerable<T> first, IAsyncEnumerable<T> second)
    {
        var enum1 = first.GetAsyncEnumerator();
        var enum2 = second.GetAsyncEnumerator();

        var nextWait1 = enum1.MoveNextAsync().AsTask();
        var nextWait2 = enum2.MoveNextAsync().AsTask();

        do
        {
            var task = await Task.WhenAny(nextWait1, nextWait2).ConfigureAwait(false);

            if (task == nextWait1)
            {
                yield return enum1.Current;

                nextWait1 = !await task.ConfigureAwait(false) ? null : enum1.MoveNextAsync().AsTask();
            }
            else if (task == nextWait2)
            {
                yield return enum2.Current;

                nextWait2 = !await task.ConfigureAwait(false) ? null : enum2.MoveNextAsync().AsTask();
            }
        } while (nextWait1 != null && nextWait2 != null);

        while (nextWait1 != null)
        {
            if (!await nextWait1.ConfigureAwait(false))
            {
                nextWait1 = null;
            }
            else
            {
                yield return enum1.Current;
                nextWait1 = enum1.MoveNextAsync().AsTask();
            }
        }

        while (nextWait2 != null)
        {
            if (!await nextWait2.ConfigureAwait(false))
            {
                nextWait2 = null;
            }
            else
            {
                yield return enum2.Current;
                nextWait2 = enum2.MoveNextAsync().AsTask();
            }
        }
    }
}

然后您可以使用一个数据await foreach并将数据存储在列表中。

于 2021-02-11T13:28:03.947 回答
1

您可以编写另一个返回 Task 的异步本地方法。

Func<IAsyncEnumerable<Foo>, Task<List<Foo>>> readValues = async (values) => {
        List<Foo> foos = new List<Foo>();
        await foreach (Foo foo1 in values)
        {
            foos.Add(foo1);
        }        
        return foos;
};

并这样称呼它:

Task<List<Foo>> task1 = readValues(Source1());
Task<List<Foo>> task2 = readValues(Source2());

await Task.WhenAll(task1, task2);

整个代码将是:

public class FoosAsync
{
    public async IAsyncEnumerable<Foo> Source1() { }

    public async IAsyncEnumerable<Foo> Source2() { }

    public async Task<List<Foo>> SequentialSourcesQuery()
    {
        var asyncEnumerator = Source1().GetAsyncEnumerator();
        Func<IAsyncEnumerable<Foo>, Task<List<Foo>>> readValues = async (values) => {
            List<Foo> foos2 = new List<Foo>();
            await foreach (Foo foo in values)
            {
                foos2.Add(foo);
            }        
            return foos2;
        };
        
        Task<List<Foo>> task1 = readValues(Source1());
        Task<List<Foo>> task2 = readValues(Source2());
        
        await Task.WhenAll(task1, task2);
        
        List<Foo> foos = new List<Foo>(task1.Result.Count + task2.Result.Count);
        foos.AddRange(task1.Result);
        foos.AddRange(task2.Result);

        return foos;
    }
}
于 2021-02-11T10:28:22.067 回答