6

我有从 SQL 流式传输数据并将其写入不同存储的代码。代码大致是这样的:

using (var cmd = new SqlCommand("select * from MyTable", connection))
{
     using (var reader = await cmd.ExecuteReaderAsync())
     {
         var list = new List<MyData>();
         while (await reader.ReadAsync())
         {
             var row = GetRow(reader);
             list.Add(row);
             if (list.Count == BatchSize)
             {
                 await WriteDataAsync(list);
                 list.Clear();
             }
         }
         if (list.Count > 0)
         {
             await WriteDataAsync(list);
         }
     }
 }

我想为此使用反应式扩展。理想情况下,代码如下所示:

await StreamDataFromSql()
    .Buffer(BatchSize)
    .ForEachAsync(async batch => await WriteDataAsync(batch));

但是,扩展方法 ForEachAsync 似乎只接受同步操作。是否可以编写一个接受异步操作的扩展?

4

4 回答 4

5

是否可以编写一个接受异步操作的扩展?

不是直接的。

Rx 订阅必须是同步的,因为 Rx 是基于推送的系统。当一个数据项到达时,它会遍历您的查询,直到它到达最终订阅——在本例中是执行一个Action.

awaitRx 提供的-able 方法是await序列本身- 即,ForEachAsync就序列而言是异步的(您正在异步等待序列完成),但其中的订阅ForEachAsync(对每个元素采取的操作)仍然必须是同步的.

为了在数据管道中进行同步到异步的转换,您需要有一个缓冲区。Rx 订阅可以(同步)作为生产者添加到缓冲区,而异步消费者正在检索项目并处理它们。因此,您需要一个同时支持同步和异步操作的生产者/消费者队列。

TPL Dataflow 中的各种块类型可以满足这一需求。像这样的东西就足够了:

var obs = StreamDataFromSql().Buffer(BatchSize);
var buffer = new ActionBlock<IList<T>>(batch => WriteDataAsync(batch));
using (var subscription = obs.Subscribe(buffer.AsObserver()))
  await buffer.Completion;

请注意,没有背压;尽可能快地StreamDataFromSql推送数据,它将被缓冲并存储在ActionBlock. 根据数据的大小和类型,这会很快使用大量内存。

于 2017-07-29T01:29:24.650 回答
0

正确的做法是正确使用 Reactive Extensions 来完成这项工作 - 所以从创建连接的那一刻开始,直到您写入数据。

就是这样:

IObservable<IList<MyData>> query =
    Observable
        .Using(() => new SqlConnection(""), connection =>
            Observable
                .Using(() => new SqlCommand("select * from MyTable", connection), cmd =>
                    Observable
                        .Using(() => cmd.ExecuteReader(), reader =>
                            Observable
                                .While(() => reader.Read(), Observable.Return(GetRow(reader))))))
        .Buffer(BatchSize);

IDisposable subscription =
    query
        .Subscribe(async list => await WriteDataAsync(list));

我无法测试代码,但它应该可以工作。这段代码假设也WriteDataAsync可以带一个IList<MyData>。如果它不只是放在一个.ToList().

于 2017-07-29T01:20:03.030 回答
0

ForEachAsync这是支持异步操作的方法的一个版本。它将源 observable 投影到IObservable<IObservable<Unit>>包含异步操作的嵌套中,然后将其展平回IObservable<Unit>usingMerge运算符。最终的 observable 最终被转换为一个任务。

默认情况下,操作是按顺序调用的,但可以通过配置可选maximumConcurrency参数同时调用它们。

取消可选cancellationToken参数导致返回的立即完成(取消)Task,可能在取消当前运行的操作之前。

任何可能发生的异常都会通过 , 传播Task,并导致取消所有当前正在运行的操作。

/// <summary>
/// Invokes an asynchronous action for each element in the observable sequence,
/// and returns a 'Task' that represents the completion of the sequence and
/// all the asynchronous actions.
/// </summary>
public static Task ForEachAsync<TSource>(
    this IObservable<TSource> source,
    Func<TSource, CancellationToken, Task> action,
    CancellationToken cancellationToken = default,
    int maximumConcurrency = 1)
{
    // Arguments validation omitted
    return source
        .Select(item => Observable.FromAsync(ct => action(item, ct)))
        .Merge(maximumConcurrency)
        .DefaultIfEmpty()
        .ToTask(cancellationToken);
}

使用示例:

await StreamDataFromSql()
    .Buffer(BatchSize)
    .ForEachAsync(async (batch, token) => await WriteDataAsync(batch, token));
于 2020-11-20T20:42:47.300 回答
-1

这是ForEachAsync 的源代码一篇关于 ToEnumerable 和 AsObservable 方法的文章

我们可以围绕 ForEachAsync 做一个包装器,它将等待一个任务返回函数:

public static async Task ForEachAsync<T>( this IObservable<T> t, Func<T, Task> onNext )
{
    foreach ( var x in t.ToEnumerable() )
        await onNext( x );
}

示例用法:

await ForEachAsync( Observable.Range(0, 10), async x => await Task.FromResult( x ) );
于 2017-07-28T23:58:52.047 回答