我正在尝试自己切片可观察流,例如:
val source = Observable.from(1 to 10).share
val boundaries = source.filter(_ % 3 == 0)
val result = source.tumblingBuffer(boundaries)
result.subscribe((buf) => println(buf.toString))
输出为:
Buffer()
Buffer()
Buffer()
Buffer()
source
boundaries
可能在到达之前在线迭代,result
所以它只创建边界和结果缓冲区,但没有什么可填充的。
我的方法是使用publish
/ connect
:
val source2 = Observable.from(1 to 10).publish
val boundaries2 = source2.filter(_ % 3 == 0)
val result2 = source2.tumblingBuffer(boundaries2)
result2.subscribe((buf) => println(buf.toString))
source2.connect
这可以产生输出:
Buffer(1, 2)
Buffer(3, 4, 5)
Buffer(6, 7, 8)
Buffer(9, 10)
现在我只需connect
要从外部世界隐藏connect
它,当它result
被订阅时(我在一个类中这样做,我不想暴露它)。就像是:
val source3 = Observable.from(1 to 10).publish
val boundaries3 = source3.filter(_ % 3 == 0)
val result3 = source3
.tumblingBuffer(boundaries3)
.doOnSubscribe(() => source3.connect)
result3.subscribe((buf) => println(buf.toString))
但是现在,doOnSubscribe
动作永远不会被调用,所以发布source
永远不会连接......
怎么了?