7

我有一个异步方法,比如:

public async Task<T> GetAsync()
{

}

并将从以下位置调用:

public async Task<IEnumerable<T>> GetAllAsync()
{
    foreach (var item in something)
    {
        var result = await GetAsync();
        yield return result;
    }
}

上面的语法无效,但基本上我在使用异步生成器。我知道它可以通过 Observables 处理。我确实对 Rx.NET 进行了实验,它在一定程度上起作用。但我试图避免它给代码库带来的复杂性,更重要的是,上述要求本质上仍然不是一个反应式系统(我们的仍然是基于拉的)。例如,我只会在一段时间内收听传入的异步流,并且我必须从消费者端停止生产者(而不仅仅是取消订阅消费者)。

我可以像这样反转方法签名:

public IEnumerable<Task<T>> GetAllAsync()

但这使得在不阻塞的情况下进行 LINQ 操作有点棘手。我希望它是非阻塞的,并且不将整个东西加载到内存中。这个库:AsyncEnumerable完全符合我的要求,但如何使用Ix.NET完成相同的操作?我相信它们是为了同样的事情。

换句话说,我如何利用 Ix.NETIAsyncEnumerable在处理时生成一个await?喜欢,

public async IAsyncEnumerable GetAllAsync()
{
    foreach (var item in something)
    {
        var result = await GetAsync();
        return // what?
    }
}
4

1 回答 1

5

(已编辑)

使用来自 NuGet 的 System.Linq.Async 4.0.0,现在您可以使用SelectAwait.

class Program
{
    static void Main(string[] args)
    {
        Task.Run(async () =>
            await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));

        Thread.Sleep(4000);
    }

    static IAsyncEnumerable<string> GetAllAsync()
    {
        var something = new[] { 1, 2, 3 };

        return something
            .ToAsyncEnumerable()
            .SelectAwait(async (x) => await GetAsync(x));
    }

    static async Task<string> GetAsync(int item)
    {
        await Task.Delay(1000); // heavy
        return "got " + item;
    }
}

(过时的)

使用来自 NuGet 的 System.Interactive.Async 3.2.0,怎么样?目前Select()不支持 async lambda,需要自己实现。

更好地支持异步 - AsyncEnumerable 的基于任务的重载

class Program
{
    static void Main(string[] args)
    {
        Task.Run(async () =>
            await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));

        Thread.Sleep(4000);
    }

    static IAsyncEnumerable<string> GetAllAsync()
    {
        var something = new[] { 1, 2, 3 };

        return something.SelectAsync(async (x) => await GetAsync(x));
    }

    static async Task<string> GetAsync(int item)
    {
        await Task.Delay(1000); // heavy
        return "got " + item;
    }
}

static class AsyncEnumerableExtensions
{
    public static IAsyncEnumerable<TResult> SelectAsync<T, TResult>(this IEnumerable<T> enumerable, Func<T, Task<TResult>> selector)
    {
        return AsyncEnumerable.CreateEnumerable(() =>
        {
            var enumerator = enumerable.GetEnumerator();
            var current = default(TResult);
            return AsyncEnumerable.CreateEnumerator(async c =>
                {
                    var moveNext = enumerator.MoveNext();
                    current = moveNext
                        ? await selector(enumerator.Current).ConfigureAwait(false)
                        : default(TResult);
                    return moveNext;
                },
                () => current,
                () => enumerator.Dispose());
        });
    }
}

此示例引用了扩展方法。https://github.com/maca88/AsyncGenerator/issues/94#issuecomment-385286972

于 2018-08-30T22:56:21.143 回答