12

I'm tinkering around with the new IAsyncEnumerable<T> stuff in C# 8.0. Let's say I've got some method somewhere that I want to consume:

public IAsyncEnumerable<T> SomeBlackBoxFunctionAsync<T>(...) { ... }

I'm aware that I can use it with the await foreach... syntax. But let's say my consumer needs to have all results from this function before it continues. What's the best syntax to await all results before continuing? In other words, I'd like to be able to do something like:

// but that extension - AllResultsAsync() - doesn't exist :-/
List<T> myList = await SomeBlackBoxFunctionAsync<T>().AllResultsAsync(); 

What's the correct way to do this?

4

3 回答 3

16

首先是一个警告:根据定义,异步流可​​能永远不会结束并继续产生结果,直到应用程序终止。这已经在例如 SignalR 或 gRPC 中使用。轮询循环也以这种方式工作。

在异步流上使用ToListAsync可能会产生意想不到的后果。


System.Linq.Async包已经提供了这样的运算符。

通过ToListAsync可以使用整个流。代码*看似简单,但隐藏了一些有趣的问题:

public static ValueTask<List<TSource>> ToListAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)
{
    if (source == null)
        throw Error.ArgumentNull(nameof(source));

    if (source is IAsyncIListProvider<TSource> listProvider)
        return listProvider.ToListAsync(cancellationToken);

    return Core(source, cancellationToken);

    static async ValueTask<List<TSource>> Core(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
    {
        var list = new List<TSource>();

        await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
        {
            list.Add(item);
        }

        return list;
    }
}

首先,它返回一个ValueTask. 其次,它确保观察到取消并ConfigureAwait(false)用于防止死锁。最后,如果源已经提供了自己的ToListAsync实现,那么运营商就会遵从它。

于 2019-11-18T13:25:41.730 回答
4

根据@DmitryBychenko 的评论,我写了一个扩展来做我想要的:

    public static async Task<ICollection<T>> AllResultsAsync<T>(this IAsyncEnumerable<T> asyncEnumerable)
    {
        if (null == asyncEnumerable)
            throw new ArgumentNullException(nameof(asyncEnumerable));  

        var list = new List<T>();
        await foreach (var t in asyncEnumerable)
        {
            list.Add(t);
        }

        return list;
    }

我只是有点惊讶这不是 C# 8.0 自带的……这似乎是一个非常明显的需求。

于 2019-11-18T13:11:58.030 回答
1

作为一个选项,您可以使用ToArrayAsync扩展方法,在System.Linq.Async包中定义

public static ValueTask<TSource[]> ToArrayAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)

根据定义,它扩展了IAsyncEnumerable接口

于 2019-11-18T13:35:20.873 回答