20

我现有的代码类似于:

IEnumerable<SomeClass> GetStuff()
{
    using (SqlConnection conn = new SqlConnection(connectionString))
    using (SqlCommand cmd = new SqlCommand(sql, conn)
    {
        conn.Open();
        SqlDataReader reader = cmd.ExecuteReader();
        while (reader.Read())
        {
            SomeClass someClass = f(reader); // create instance based on returned row
            yield return someClass;
        }
    } 
}

看来我可以通过使用reader.ReadAsync(). 但是,如果我只修改一行:

        while (await reader.ReadAsync())

编译器通知我await只能在标有 的方法中使用async,并建议我将方法签名修改为:

async Task<IEnumerable<SomeClass>> GetStuff()

但是,这样做会导致GetStuff()无法使用,因为:

的主体GetStuff()不能是迭代器块,因为Task<IEnumerable<SomeClass>>它不是迭代器接口类型。

我确定我错过了异步编程模型的一个关键概念。

问题:

  • 我可以ReadAsync()在我的迭代器中使用吗?如何?
  • 我如何才能以不同的方式思考异步范式,以便我了解它在这种情况下的工作原理?
4

4 回答 4

21

问题是你所问的实际上并没有多大意义。IEnumerable<T>是一个同步接口,返回Task<IEnumerable<T>>不会对你有太大帮助,因为无论如何,一些线程必须阻塞等待每个项目。

您真正想要返回的是一些异步替代方案IEnumerable<T>:例如IObservable<T>来自 TPL Dataflow 的数据流块或IAsyncEnumerable<T>计划添加到 C# 8.0/.Net Core 3.0 的数据流块。(与此同时,有一些 包含它。)

使用 TPL 数据流,一种方法是:

ISourceBlock<SomeClass> GetStuff() {
    var block = new BufferBlock<SomeClass>();

    Task.Run(async () =>
    {
        using (SqlConnection conn = new SqlConnection(connectionString))
        using (SqlCommand cmd = new SqlCommand(sql, conn))
        {
            await conn.OpenAsync();
            SqlDataReader reader = await cmd.ExecuteReaderAsync();
            while (await reader.ReadAsync())
            {
                SomeClass someClass;
                // Create an instance of SomeClass based on row returned.
                block.Post(someClass);
            }
            block.Complete();
        } 
    });

    return block;
}

您可能希望在上面的代码中添加错误处理,但除此之外,它应该可以工作并且它将是完全异步的。

然后,您的其余代码也将异步使用返回块中的项目,可能使用ActionBlock.

于 2012-10-30T22:45:27.337 回答
20

不,您目前不能将 async 与迭代器块一起使用。正如 svick 所说,你需要类似的东西IAsyncEnumerable来做到这一点。

如果您有返回值Task<IEnumerable<SomeClass>>,则意味着该函数返回一个Task对象,一旦完成,将为您提供一个完整的 IEnumerable(此枚举中没有任务异步空间)。一旦任务对象完成,调用者应该能够同步遍历它在可枚举中返回的所有项目。

这是一个返回的解决方案Task<IEnumerable<SomeClass>>。通过执行以下操作,您可以获得异步的很大一部分好处:

async Task<IEnumerable<SomeClass>> GetStuff()
{
    using (SqlConnection conn = new SqlConnection(""))
    {
        using (SqlCommand cmd = new SqlCommand("", conn))
        {
            await conn.OpenAsync();
            SqlDataReader reader = await cmd.ExecuteReaderAsync();
            return ReadItems(reader).ToArray();
        }
    }
}

IEnumerable<SomeClass> ReadItems(SqlDataReader reader)
{
    while (reader.Read())
    {
        // Create an instance of SomeClass based on row returned.
        SomeClass someClass = null;
        yield return someClass;
    }
}

...以及一个示例用法:

async void Caller()
{
    // Calls get-stuff, which returns immediately with a Task
    Task<IEnumerable<SomeClass>> itemsAsync = GetStuff();
    // Wait for the task to complete so we can get the items
    IEnumerable<SomeClass> items = await itemsAsync;
    // Iterate synchronously through the items which are all already present
    foreach (SomeClass item in items)
    {
        Console.WriteLine(item);
    }
}

在这里,您将迭代器部分和异步部分放在不同的函数中,这允许您同时使用 async 和 yield 语法。该GetStuff函数异步获取数据,ReadItems然后将数据同步读取到可枚举中。

注意ToArray()通话。像这样的东西是必要的,因为枚举器函数延迟执行,因此您的异步函数可能会在读取所有数据之前处理连接和命令。这是因为using块覆盖了Task执行的持续时间,但是您将对其进行迭代after以完成任务。

此解决方案不使用ReadAsync确实使用OpenAsyncand ExecuteReaderAsync,这可能会给您带来大部分好处。以我的经验,ExecuteReader 将花费最多的时间并且最大的好处是异步。当我读完第一行时,SqlDataReader已经有所有其他行并且ReadAsync只是同步返回。如果您也是这种情况,那么迁移到基于推送的系统(如IObservable<T>(这将需要对调用函数进行重大修改))不会获得显着的好处。

为了说明,请考虑解决同一问题的另一种方法:

IEnumerable<Task<SomeClass>> GetStuff()
{
    using (SqlConnection conn = new SqlConnection(""))
    {
        using (SqlCommand cmd = new SqlCommand("", conn))
        {
            conn.Open();
            SqlDataReader reader = cmd.ExecuteReader();
            while (true)
                yield return ReadItem(reader);
        }
    }
}

async Task<SomeClass> ReadItem(SqlDataReader reader)
{
    if (await reader.ReadAsync())
    {
        // Create an instance of SomeClass based on row returned.
        SomeClass someClass = null;
        return someClass;
    }
    else
        return null; // Mark end of sequence
}

...以及一个示例用法:

async void Caller()
{
    // Synchronously get a list of Tasks
    IEnumerable<Task<SomeClass>> items = GetStuff();
    // Iterate through the Tasks
    foreach (Task<SomeClass> itemAsync in items)
    {
        // Wait for the task to complete. We need to wait for 
        // it to complete before we can know if it's the end of
        // the sequence
        SomeClass item = await itemAsync;
        // End of sequence?
        if (item == null) 
            break;
        Console.WriteLine(item);
    }
}

在这种情况下,GetStuff立即返回一个可枚举项,其中可枚举项中的每个项目都是一个任务,SomeClass当它完成时将呈现一个对象。这种方法有一些缺陷。首先,可枚举是同步返回的,所以在它返回时我们实际上不知道结果中有多少行,这就是为什么我将它设为无限序列的原因。这是完全合法的,但它有一些副作用。我需要使用null表示无限任务序列中有用数据的结束。其次,你必须小心你如何迭代它。您需要向前迭代它,并且在迭代到下一行之前需要等待每一行。您还必须仅在所有任务完成后处理迭代器,以便 GC 在完成使用之前不会收集连接。由于这些原因,这不是一个安全的解决方案,我必须强调,我将其包括在内是为了帮助回答您的第二个问题。

于 2012-10-31T00:17:34.190 回答
2

在我的经验中,严格地说异步迭代器(或有可能),SqlCommand我注意到代码的同步版本大大优于它的async对应版本。在速度和内存消耗方面。

也许,对这个观察持保留态度,因为测试范围仅限于我的机器和本地 SQL Server 实例。

不要误会我的意思,.NET 环境中的async/await范式在适当的情况下非常简单、强大和有用。然而,经过大量的努力,我不相信数据库访问是一个合适的用例。当然,除非您需要同时执行多个命令,在这种情况下,您可以简单地使用TPL来同时触发命令。

我更喜欢的方法是考虑以下因素:

  • 保持SQL 工作单元小、简单和可组合(即让您的 SQL 执行“便宜”)。
  • 避免在可以向上游推送到应用程序级别的 SQL Server 上进行工作。一个完美的例子就是排序。
  • 最重要的是,大规模测试您的 SQL 代码并查看 Statistics IO 输出/执行计划。以 10k 条记录快速运行的查询可能(并且可能会)在有 1M 条记录时表现完全不同。

您可以提出这样的论点,即在某些报告场景中,上述某些要求是不可能的。但是,在报告服务的上下文中,真的需要异步性(甚至是一个词吗?)?

微软布道者 Rick Anderson 有一篇关于这个主题的精彩文章。请注意,它已经过时(从 2009 年开始),但仍然非常重要。

于 2017-08-13T14:20:38.117 回答
1

从 C# 8 开始,这可以通过IAsyncEnumerable来完成

修改后的代码:

async IAsyncEnumerable<SomeClass> GetStuff()
{
    using (SqlConnection conn = new SqlConnection(connectionString))
    using (SqlCommand cmd = new SqlCommand(sql, conn)
    {
        conn.Open();
        SqlDataReader reader = cmd.ExecuteReader();
        while (reader.Read())
        {
            SomeClass someClass = f(reader); // create instance based on returned row
            yield return someClass;
        }
    } 
}

像这样消费它:

await foreach (var stuff in GetStuff())
    ...
于 2021-12-14T15:27:36.163 回答