2

我想使用 Rx Buffer 功能:

var source = new Subject<Price>();
var buffer = source
    .Buffer(TimeSpan.FromSeconds(30), 5)
    .Where(p => p.Any());

这意味着自上次发出后缓冲区达到 5 或 30 秒的大小时会发生发出(发布给订阅者)。

但我需要能够按需发出 - 例如当我收到高优先级序列项目时。然后我想将它添加到 observable ( source.OnNext()) 并以某种方式强制它发出(这意味着返回缓冲区中的所有元素并清除它)。

我知道我可以添加以下代码:

var flusher = new Subject<Price>();
var closing = flusher.Select(x => new List<Price> {x});
var query = buffer.Merge(closing).Subscribe(something);

并调用 flusher.OnNext(highPriorityItem) 我会发出它。

但在这种情况下,我有两个具有两个不同发射的独立序列。当缓冲区已满或特定项目按顺序出现时,我需要一个发射。

Force flush count-type Observable.Buffer c#Force flush to Observable.Buffer c#似乎不适合我

4

2 回答 2

5

我认为 decPL 的基本思想就在这里,但他的解决方案并不稳定。根据inputobservable 的调度程序,即使它以正确的顺序订阅,您也可以获得不可预测的结果。这是因为有多个独立订阅input. 您需要通过.Publish(...)调用来推动这一切,以确保只有一个订阅。

此外,它还需要一种在处理订阅时进行清理的方法。所以它也需要通过一个.Create(...)调用来运行。

就是这样:

var input = new Subject<Price>();

IObservable<IList<Price>> query =
    input
        .Publish(i =>
            Observable
                .Create<IList<Price>>(o =>
                {
                    var timeBuffer =
                        Observable
                            .Timer(TimeSpan.FromSeconds(10.0))
                            .Select(n => Unit.Default);
                    var flush =
                        i
                            .Where(p => p.IS_IMPORTANT)
                            .Select(n => Unit.Default);
                    var sizeBuffer =
                        i
                            .Buffer(5)
                            .Select(l => Unit.Default);
                    return
                        i
                            .Window(() => Observable.Merge(timeBuffer, sizeBuffer, flush))
                            .SelectMany(w => w.ToList())
                            .Subscribe(o);
                }));

query.Subscribe(w => DO_SOMETHING_WITH_PRICES(w));
于 2017-10-11T23:27:10.577 回答
3

编辑:@Enigmativity 是绝对正确的,请参阅他的回答。保持这个完整,因为希望在这里确定思考过程会更容易一些。

尝试如下操作:

var input = new Subject<Price>(); //your input observable

var flush = new Subject<long>(); //used to manually flush the 'buffer' for important prices
var timeBuffer
   = Observable.Timer(TimeSpan.FromSeconds(10)); //controls the time-based part of 'buffer'
var sizeBuffer = input.Buffer(5).Select(l => 0L); //controls the size-based part of 'buffer'

var bufferedInput = input.Window(()=>Observable.Merge(timeBuffer, sizeBuffer, flush))
                         .SelectMany(w => w.ToList())
                         .Subscribe(w => DO_SOMETHING_WITH_PRICES(w));

//Flush on important price (NOTE - order of the two subscriptions matter)
input.Where(p => p.IS_IMPORTANT).Subscribe(p => flush.OnNext(0L));
于 2017-10-11T14:05:16.640 回答