0

我正在尝试自己切片可观察流,例如:

val source = Observable.from(1 to 10).share
val boundaries = source.filter(_ % 3 == 0)
val result = source.tumblingBuffer(boundaries)

result.subscribe((buf) => println(buf.toString))

输出为:

Buffer()
Buffer()
Buffer()
Buffer()

sourceboundaries可能在到达之前在线迭代,result所以它只创建边界和结果缓冲区,但没有什么可填充的。

我的方法是使用publish/ connect

val source2 = Observable.from(1 to 10).publish
val boundaries2 = source2.filter(_ % 3 == 0)
val result2 = source2.tumblingBuffer(boundaries2)

result2.subscribe((buf) => println(buf.toString))
source2.connect

这可以产生输出:

Buffer(1, 2)
Buffer(3, 4, 5)
Buffer(6, 7, 8)
Buffer(9, 10)

现在我只需connect要从外部世界隐藏connect它,当它result被订阅时(我在一个类中这样做,我不想暴露它)。就像是:

val source3 = Observable.from(1 to 10).publish
val boundaries3 = source3.filter(_ % 3 == 0)
val result3 = source3
          .tumblingBuffer(boundaries3)
          .doOnSubscribe(() => source3.connect)

result3.subscribe((buf) => println(buf.toString))

但是现在,doOnSubscribe动作永远不会被调用,所以发布source永远不会连接......

怎么了?

4

1 回答 1

1

您的publish解决方案走在了正确的轨道上。然而,还有一个替代publish运算符将 lambda 作为其类型的参数(请参阅文档Observable[T] => Observable[R]。此 lambda 的参数是原始流,您可以安全地多次订阅它。在 lambda 中,您可以根据自己的喜好转换原始流;在您的情况下,您过滤流并将其缓冲在该过滤器上。

Observable.from(1 to 10)
    .publish(src => src.tumblingBuffer(src.filter(_ % 3 == 0)))
    .subscribe(buf => println(buf.toString()))

这个操作员最好的事情是你不需要在connect之后调用任何东西。

于 2016-05-10T06:30:23.557 回答