3

请看以下两种方法。第一个返回一个IAsyncEnumerable. 第二个试图消耗它。

using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class SqlUtility
{
    public static async IAsyncEnumerable<IDataRecord> GetRecordsAsync(
        string connectionString, SqlParameter[] parameters, string commandText,
        [EnumeratorCancellation]CancellationToken cancellationToken)
    {
        using (SqlConnection connection = new SqlConnection(connectionString))
        {
            await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
            using (SqlCommand command = new SqlCommand(commandText, connection))
            {
                command.Parameters.AddRange(parameters);
                using (var reader = await command.ExecuteReaderAsync()
                    .ConfigureAwait(false))
                {
                    while (await reader.ReadAsync().ConfigureAwait(false))
                    {
                        yield return reader;
                    }
                }
            }
        }
    }

    public static async Task Example()
    {
        const string connectionString =
            "Server=localhost;Database=[Redacted];Integrated Security=true";
        SqlParameter[] parameters = new SqlParameter[]
        {
            new SqlParameter("VideoID", SqlDbType.Int) { Value = 1000 }
        };
        const string commandText = "select * from Video where VideoID=@VideoID";
        IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString,
            parameters, commandText, CancellationToken.None);
        IDataRecord firstRecord = await records.FirstAsync().ConfigureAwait(false);
        object videoID = firstRecord["VideoID"]; //Should be 1000.
        // Instead, I get this exception:
        // "Invalid attempt to call MetaData when reader is closed."
    }
}

当代码尝试读取结果IDataReader(at object videoID = firstRecord["VideoID"];)时,我得到了这个异常:

阅读器关闭时调用元数据的尝试无效。

这是因为SqlDataReader被处置。有人可以提供推荐的方法来SqlDataReader以异步方式枚举,以便调用方法可以使用每个结果记录吗?谢谢你。

4

5 回答 5

7

在这种情况下,LINQ 不是您的朋友,因为FirstAsync它将在迭代器返回结果之前关闭它,这不是 ADO.NET 所期望的;基本上:不要在这里使用 LINQ,或者至少:不要以这种方式。您可能可以使用类似的东西在序列仍然打开时Select执行投影,或者将这里的所有工作卸载到像 Dapper 这样的工具可能更容易。或者,手动进行:

await foreach (var record in records)
{
    // TODO: process record
    // (perhaps "break"), because you only want the first
}
于 2019-11-12T19:56:08.237 回答
3

您可以通过不返回依赖于仍然打开的连接的对象来避免这种情况。例如,如果您只需要VideoID,那么只需返回它(我假设它是一个int):

public static async IAsyncEnumerable<int> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)
{
    ...
                    yield return reader["VideoID"];
    ...
}

或项目到您自己的班级:

public class MyRecord {
    public int VideoId { get; set; }
}

public static async IAsyncEnumerable<MyRecord> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)
{
    ...
                    yield return new MyRecord {
                        VideoId = reader["VideoID"]
                    }
    ...
}

或者按照 Marc 的建议做,在第一个之后使用foreachand break,在你的情况下看起来像这样:

IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString, parameters, commandText, CancellationToken.None);
object videoID;
await foreach (var record in records)
{
    videoID = record["VideoID"];
    break;
}
于 2019-11-12T20:02:29.323 回答
2

当你公开一个 openDataReader时,关闭它和底层的责任Connection现在属于调用者,所以你不应该处置任何东西。相反,您应该使用DbCommand.ExecuteReaderAsync接受CommandBehavior参数的重载,并传递CommandBehavior.CloseConnection值:

执行命令时,关闭关联的 DataReader 对象时,关闭关联的 Connection 对象。

那你就只能希望调用者按规矩DataReader.Close办事,及时调用方法,在对象被垃圾回收之前不让连接打开。出于这个原因,暴露一个开放DataReader应该被认为是一种极端的性能优化技术,应该谨慎使用。

顺便说一句,如果您返回 anIEnumerable<IDataRecord>而不是IAsyncEnumerable<IDataRecord>.

于 2019-11-12T22:36:27.510 回答
1

要添加到其他答案,您可以使您的实用程序方法通用并添加一个投影委托,Func<IDataRecord, T> projection作为参数,如下所示:

public static async IAsyncEnumerable<T> GetRecordsAsync<T>(
    string connectionString, SqlParameter[] parameters, string commandText,
    Func<IDataRecord, T> projection, // Parameter here
    [EnumeratorCancellation] CancellationToken cancellationToken)
{
    ...
                    yield return projection(reader); // Projected here
    ...
}

然后在调用时传入一个 lambda 或引用一个方法组,例如:

public static object GetVideoId(IDataRecord dataRecord)
    => dataRecord["VideoID"];

这样的:

GetRecordsAsync(connectionString, parameters, commandText, GetVideoId, CancellationToken.None);
于 2020-10-23T20:58:34.347 回答
0

在 2021 年末,我有这个确切的问题。我找不到一个完整的例子,所以我只是把我能找到的东西弄乱了,直到我找到了一些工作。

这是我的完整代码示例,虽然很简单(因此您可以稍后对其进行扩展),并附有一些评论,详细说明了我在此过程中遇到的一些坑:

// This function turns each "DataRow" into an object of Type T and yields
// it. You could alternately yield the reader itself for each row.
// In this example, assume sqlCommandText and connectionString exist.
public async IAsyncEnumerable<T> ReadAsync<T>( Func<SqlDataReader, T> func )
{
    // we need a connection that will last as long as the reader is open,
    // alternately you could pass in an open connection.
    using SqlConnection connection = new SqlConnection( connectionString );
    using SqlCommand cmd = new SqlCommand( sqlCommandText, connection );

    await connection.OpenAsync();
    var reader = await cmd.ExecuteReaderAsync();
    while( await reader.ReadAsync() )
    {
        yield return func( reader );
    }
}

然后在您的(异步)代码的任何其他部分中,您可以await foreach循环调用您的函数:

private static async Task CallIAsyncEnumerable()
{
    await foreach( var category in ReadAsync( ReaderToCategory ) )
    {
        // do something with your category; save it in a list, write it to disk,
        // make an HTTP call ... the whole world is yours!
    }
}

// an example delegate, which I'm passing into ReadAsync
private static Category ReaderToCategory( SqlDataReader reader )
{
    return new Category()
    {
        Code = ( string )reader[ "Code" ],
        Group = ( string )reader[ "Group" ]
    };
}

我发现的其他几件事:您不能yield从 atry中,但您可以将所有内容(包括)填充cmd.ExecuteReaderAsync()到 atry或返回 DataReader 的单独方法中。或者您可以将其包装await foreach在一个try块中;我认为问题在于向尝试之外的调用者让步(这是有道理的,在你考虑之后)。

如果您使用另一种方法生成阅读器,请将连接传递给该方法,以便您控制其生命周期。如果您的方法创建连接、执行命令并返回SqlDataReader,则连接将关闭(如果您使用了“使用”),然后您才能从阅读器中读取。再一次,如果你仔细想想,这很有意义,但它让我绊倒了几分钟。

祝你好运,我希望这对其他人有所帮助!

于 2021-12-31T16:10:24.810 回答