我想使用 Rx Buffer 功能:
var source = new Subject<Price>();
var buffer = source
.Buffer(TimeSpan.FromSeconds(30), 5)
.Where(p => p.Any());
这意味着自上次发出后缓冲区达到 5 或 30 秒的大小时会发生发出(发布给订阅者)。
但我需要能够按需发出 - 例如当我收到高优先级序列项目时。然后我想将它添加到 observable ( source.OnNext()
) 并以某种方式强制它发出(这意味着返回缓冲区中的所有元素并清除它)。
我知道我可以添加以下代码:
var flusher = new Subject<Price>();
var closing = flusher.Select(x => new List<Price> {x});
var query = buffer.Merge(closing).Subscribe(something);
并调用 flusher.OnNext(highPriorityItem) 我会发出它。
但在这种情况下,我有两个具有两个不同发射的独立序列。当缓冲区已满或特定项目按顺序出现时,我需要一个发射。
Force flush count-type Observable.Buffer c#和Force flush to Observable.Buffer c#似乎不适合我