6

I am using Reactive Extensions (Rx) to buffer some data. I'm having an issue though in that I then need to do something asynchronous with this data, yet I don't want the buffer to pass the next group through until the asynchronous operation is complete.

I've tried to structure the code two ways (contrived example):

public async Task processFiles<File>(IEnumerable<File> files)
{
    await files.ToObservable()
        .Buffer(10)
        .SelectMany(fi => fi.Select(f => upload(f)) //Now have an IObservable<Task>
        .Select(t => t.ToObservable())
        .Merge()
        .LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}

or

public async Task processFiles<File>(IEnumerable<File> files)
{
    var buffered = files.ToObservable()
        .Buffer(10);

    buffered.Subscribe(async files => await Task.WhenAll(files.Select(f => upload(f)));

    await buffered.LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}

Unfortunately, neither of these methods have worked as the buffer pushes the next group before the async operations complete. The intent is to have each buffered group executed asynchronously and only when that operation is complete, continue with the next buffered group.

Any help is greatly appreciated.

4

2 回答 2

3

To make sure I understand you correctly, it sounds like you want to ensure you carry on buffering items while only presenting each buffer when the previous buffer has been processed.

You also need to make the processing of each buffer asynchronous.

It's probably valuable to consider some theoretical points, because I have to confess that I'm a bit confused about the approach. IObservable is often said to be the dual of IEnumerable because it mirrors the latter with the key difference being that data is pushed to the consumer rather than the consumer pulling it as it chooses.

You are trying to use the buffered stream like an IEnumerable instead of an IObservable - you essentially want to pull the buffers rather than have them pushed to you - so I do have to wonder if have you picked the right tool for the job? Are you are trying to hold up the buffering operation itself while a buffer is processed? As a consumer having the data pushed at you this isn't really a correct approach.

You could consider applying a ToEnumerable() call to the buffer operation, so that you can deal we the buffers when ready. That won't prevent ongoing buffering occurring while you deal with a current buffer though.

There's little you can do to prevent this - doing the buffer processing synchronously inside a Select() operation applied to the buffer would carry a guarantee that no subsequent OnNext() call would occur until the Select() projection completed. The guarantee comes for free as the Rx library operators enforce the grammar of Rx. But it's only guaranteeing non-overlapping invocations of OnNext() - there's nothing to say a given operator couldn't (and indeed shouldn't) carry on getting the next OnNext() ready to go. That's the nature of a push based API.

It's very unclear why you think you need the projection to be asynchronous if you also want to block the Buffers? Have a think about this - I suspect using a synchronous Select() in your observer might solve the issue but it's not entirely clear from your question.

Similar to a synchronous Select() is a synchronous OnNext() handler which is easier to handle if your processing of items have no results - but it's not the same because (depending on the implementation of the Observable) you are only blocking delivery of OnNext() calls to that Subscriber rather than all Subscribers. However, with just a single Subscriber it's equivalent so you could do something like:

void Main()
{
    var source = Observable.Range(1, 4);

    source.Buffer(2)
        .Subscribe(i =>
    {
        Console.WriteLine("Start Processing Buffer");
        Task.WhenAll(from n in i select DoUpload(n)).Wait();
        Console.WriteLine("Finished Processing Buffer");
    });
}

private Task DoUpload(int i)
{
    return Task.Factory.StartNew(
        () => {
            Thread.Sleep(1000);
            Console.WriteLine("Process File " + i);
        });
}

Which outputs (*with no guarantee on the order of Process File x within a Buffer):

Start Processing Buffer
Process File 2
Process File 1
Finished Processing Buffer
Start Processing Buffer
Process File 3
Process File 4
Finished Processing Buffer

If you prefer to use a Select() and your projection returns no results, you can do it like this:

source.Buffer(2)
    .Select(i =>
{
    Console.WriteLine("Start Processing Buffer");
    Task.WhenAll(from n in i select DoUpload(n)).Wait();
    Console.WriteLine("Finished Processing Buffer");
    return Unit.Default;
}).Subscribe();

NB: Sample code written in LINQPad and including Nuget package Rx-Main. This code is for illustrative purposes - don't Thread.Sleep() in production code!

于 2013-06-13T05:22:08.173 回答
2

首先,我认为您要求并行执行每个组中的项目,但每个组串行执行是非常不寻常的。更常见的要求是并行执行项目,但最多同时执行 n 个项目。这样一来,没有固定的分组,所以如果单个项目花费的时间太长,其他项目不必等待它。

为了满足您的要求,我认为 TPL Dataflow 比 Rx 更合适(尽管一些 Rx 代码仍然有用)。TPL Dataflow 以执行内容的“块”为中心,默认情况下是串行的,这正是您所需要的。

您的代码可能如下所示:

public static class Extensions
{
    public static Task ExecuteInGroupsAsync<T>(
         this IEnumerable<T> source, Func<T, Task> func, int groupSize)
     {
         var block = new ActionBlock<IEnumerable<T>>(
             g => Task.WhenAll(g.Select(func)));
         source.ToObservable()
               .Buffer(groupSize)
               .Subscribe(block.AsObserver());
         return block.Completion;
     }
}

public Task ProcessFiles(IEnumerable<File> files)
{
    return files.ExecuteInGroupsAsync(Upload, 10);
}

这将大部分繁重的工作留在了ActionBlock(以及一些在 Rx 上)。数据流块可以充当 Rx 观察者(和可观察者),因此我们可以利用它来继续使用Buffer().

我们想一次处理整个组,所以我们使用Task.WhenAll()创建一个Task在整个组完成时完成的。数据流块理解Task-returning 函数,因此在前一组返回的完成之前,下一组不会开始执行Task

最终结果是Completion Task,它将在源 observable 完成并完成所有处理后完成。

TPL Dataflow 也有BatchBlock,它的工作方式类似于Buffer()我们可以直接Post()从集合中的每个项目(不使用ToObservable()and AsObserver()),但我认为在这部分代码中使用 Rx 会更简单。

编辑:实际上你根本不需要 TPL 数据流。按照 James World 的建议使用ToEnumerable()就足够了:

public static async Task ExecuteInGroupsAsync<T>(
     this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.ToObservable().Buffer(groupSize).ToEnumerable();
    foreach (var g in groups)
    {
        await Task.WhenAll(g.Select(func));
    }
}

或者甚至更简单,不用 Rx 使用Batch()from morelinq

public static async Task ExecuteInGroupsAsync<T>(
    this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.Batch(groupSize);
    foreach (var group in groups)
    {
        await Task.WhenAll(group.Select(func));
    }
}
于 2013-06-13T12:32:29.450 回答