7

我在测试 C# 8.0 功能时遇到了 IAsyncEnumerable。我从 Anthony Chu ( https://anthonychu.ca/post/async-streams-dotnet-core-3-iasyncenumerable/ ) 那里找到了一些非凡的例子。它是异步流和替代Task<IEnumerable<T>>

// Data Access Layer.
public async IAsyncEnumerable<Product> GetAllProducts()
{
    Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
    var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
    while (iterator.HasMoreResults)
    {
        foreach (var product in await iterator.ReadNextAsync())
        {
            yield return product;
        }
    }
}

// Usage
await foreach (var product in productsRepository.GetAllProducts())
{
    Console.WriteLine(product);
}

我想知道这是否可以应用于读取文本文件,如下面的逐行读取文件的用法。

foreach (var line in File.ReadLines("Filename"))
{
    // ...process line.
}

我真的很想知道如何将 async withIAsyncEnumerable<string>()应用于上述 foreach 循环,以便它在阅读时流式传输。

如何实现迭代器,以便可以使用 yield return 逐行读取?

4

2 回答 2

6

完全一样,但是没有异步工作负载,所以让我们假设

public async IAsyncEnumerable<string> SomeSortOfAwesomeness()
{
   foreach (var line in File.ReadLines("Filename.txt"))
   {
       // simulates an async workload, 
       // otherwise why would be using IAsyncEnumerable?
       // -- added due to popular demand 
       await Task.Delay(100);
       yield return line;
   }
}

或者

这只是一个打包的 APM 工作负载,请参阅Stephen Clearys的评论以进行澄清

public static async IAsyncEnumerable<string> SomeSortOfAwesomeness()
{
   using StreamReader reader = File.OpenText("Filename.txt");
   while(!reader.EndOfStream)
      yield return await reader.ReadLineAsync();
}

用法

await foreach(var line in SomeSortOfAwesomeness())
{
   Console.WriteLine(line);
}

斯蒂芬·克利里更新

File.OpenText遗憾的是只允许同步 I/O;在这种情况下,异步 API的实现很差。要打开真正的异步文件,您需要使用FileStream构造函数传递isAsync:true 或 FileOptions.Asynchronous.

ReadLineAsync基本上会导致这段代码,如您所见,它只是 Stream APMBeginEnd包装的方法

private Task<Int32> BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
{            
     return TaskFactory<Int32>.FromAsyncTrim(
                    this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
                    (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
                    (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler
}
于 2019-11-26T03:24:33.317 回答
1

我做了一些性能测试,似乎一个大bufferSize的有帮助,还有这个FileOptions.SequentialScan选项。

public static async IAsyncEnumerable<string> ReadLinesAsync(string filePath)
{
    using var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read,
        FileShare.Read, 32768, FileOptions.Asynchronous | FileOptions.SequentialScan);
    using var reader = new StreamReader(stream);
    while (true)
    {
        var line = await reader.ReadLineAsync().ConfigureAwait(false);
        if (line == null) break;
        yield return line;
    }
}

虽然枚举并不是真正的异步。根据我的实验,该类的xxxAsync方法StreamReader阻塞当前线程的时间长于Task它们返回的等待时间。例如,ReadToEndAsync在我的 PC 中使用该方法读取 6 MB 文件会在返回任务之前阻塞当前线程 120 毫秒,然后任务仅在 20 毫秒内完成。所以我不确定使用这些方法有多大价值。通过将同步 API 与一些Linq.Async一起使用,伪造异步要容易得多:

IAsyncEnumerable<string> lines = File.ReadLines("SomeFile.txt").ToAsyncEnumerable();
于 2020-02-27T23:48:53.240 回答