我有一个IObservable<IObservable<T>>
每个内部IObservable<T>
都是一个值流,然后是一个最终OnCompleted
事件。
我想将其转换为IObservable<IEnumerable<T>>
, 一个由任何未完成的内部流中的最新值组成的流。IEnumerable<T>
每当从内部流之一产生新值(或内部流到期)时,它都应该产生一个新值
它最容易用大理石图显示(我希望它足够全面):
input ---.----.---.----------------
| | '-f-----g-|
| 'd------e---------|
'a--b----c-----|
result ---a--b-b--c-c-c-e-e-e---[]-
d d d e f g
f f
([]
为空IEnumerable<T>
,-|
代表 OnCompleted)
你可以看到它有点像一个CombineLatest
操作。我一直在玩,Join
但GroupJoin
无济于事,但我觉得这几乎可以肯定是前进的正确方向。
我想在这个运算符中使用尽可能少的状态。
更新
我已经更新了这个问题,不仅包括单值序列——结果IObservable<IEnumerable<T>>
应该只包括每个序列的最新值——如果一个序列没有产生一个值,它不应该被包括在内。