我有从 SQL 流式传输数据并将其写入不同存储的代码。代码大致是这样的:
using (var cmd = new SqlCommand("select * from MyTable", connection))
{
using (var reader = await cmd.ExecuteReaderAsync())
{
var list = new List<MyData>();
while (await reader.ReadAsync())
{
var row = GetRow(reader);
list.Add(row);
if (list.Count == BatchSize)
{
await WriteDataAsync(list);
list.Clear();
}
}
if (list.Count > 0)
{
await WriteDataAsync(list);
}
}
}
我想为此使用反应式扩展。理想情况下,代码如下所示:
await StreamDataFromSql()
.Buffer(BatchSize)
.ForEachAsync(async batch => await WriteDataAsync(batch));
但是,扩展方法 ForEachAsync 似乎只接受同步操作。是否可以编写一个接受异步操作的扩展?