To make sure I understand you correctly, it sounds like you want to ensure you carry on buffering items while only presenting each buffer when the previous buffer has been processed.
You also need to make the processing of each buffer asynchronous.
It's probably valuable to consider some theoretical points, because I have to confess that I'm a bit confused about the approach. IObservable is often said to be the dual of IEnumerable because it mirrors the latter with the key difference being that data is pushed to the consumer rather than the consumer pulling it as it chooses.
You are trying to use the buffered stream like an IEnumerable instead of an IObservable - you essentially want to pull the buffers rather than have them pushed to you - so I do have to wonder if have you picked the right tool for the job? Are you are trying to hold up the buffering operation itself while a buffer is processed? As a consumer having the data pushed at you this isn't really a correct approach.
You could consider applying a ToEnumerable()
call to the buffer operation, so that you can deal we the buffers when ready. That won't prevent ongoing buffering occurring while you deal with a current buffer though.
There's little you can do to prevent this - doing the buffer processing synchronously inside a Select()
operation applied to the buffer would carry a guarantee that no subsequent OnNext()
call would occur until the Select()
projection completed. The guarantee comes for free as the Rx library operators enforce the grammar of Rx. But it's only guaranteeing non-overlapping invocations of OnNext()
- there's nothing to say a given operator couldn't (and indeed shouldn't) carry on getting the next OnNext()
ready to go. That's the nature of a push based API.
It's very unclear why you think you need the projection to be asynchronous if you also want to block the Buffers? Have a think about this - I suspect using a synchronous Select()
in your observer might solve the issue but it's not entirely clear from your question.
Similar to a synchronous Select()
is a synchronous OnNext()
handler which is easier to handle if your processing of items have no results - but it's not the same because (depending on the implementation of the Observable) you are only blocking delivery of OnNext()
calls to that Subscriber rather than all Subscribers. However, with just a single Subscriber it's equivalent so you could do something like:
void Main()
{
var source = Observable.Range(1, 4);
source.Buffer(2)
.Subscribe(i =>
{
Console.WriteLine("Start Processing Buffer");
Task.WhenAll(from n in i select DoUpload(n)).Wait();
Console.WriteLine("Finished Processing Buffer");
});
}
private Task DoUpload(int i)
{
return Task.Factory.StartNew(
() => {
Thread.Sleep(1000);
Console.WriteLine("Process File " + i);
});
}
Which outputs (*with no guarantee on the order of Process File x within a Buffer):
Start Processing Buffer
Process File 2
Process File 1
Finished Processing Buffer
Start Processing Buffer
Process File 3
Process File 4
Finished Processing Buffer
If you prefer to use a Select()
and your projection returns no results, you can do it like this:
source.Buffer(2)
.Select(i =>
{
Console.WriteLine("Start Processing Buffer");
Task.WhenAll(from n in i select DoUpload(n)).Wait();
Console.WriteLine("Finished Processing Buffer");
return Unit.Default;
}).Subscribe();
NB: Sample code written in LINQPad and including Nuget package Rx-Main. This code is for illustrative purposes - don't Thread.Sleep()
in production code!