2

我只是想掌握可观察的热和冷之间的概念,并尝试使用 Monifu 库。我的理解是,下面的代码应该导致只有一个订阅者获得 Observable 发出的事件,但事实并非如此!

scala> :paste
// Entering paste mode (ctrl-D to finish)

import monifu.reactive._
import scala.concurrent.duration._

import monifu.concurrent.Implicits.globalScheduler

val obs = Observable.interval(1.second).take(10)

val x = obs.foreach(a => println(s"from x ${a}"))
val y = obs.foreach(a => println(s"from y ${a}"))

// Exiting paste mode, now interpreting.

from x 0
from y 0
import monifu.reactive._
import scala.concurrent.duration._
import monifu.concurrent.Implicits.globalScheduler
obs: monifu.reactive.Observable[Long] = monifu.reactive.Observable$$anon$5@2c3c615d
x: Unit = ()
y: Unit = ()

scala> from x 1
from y 1
from x 2
from y 2
from x 3
from y 3
from x 4
from y 4
from x 5
from y 5
from x 6
from y 6
from x 7
from y 7
from x 8
from y 8
from x 9
from y 9

所以,对我来说,这看起来像是 Observable 正在向所有感兴趣的订阅者发布事件?

4

1 回答 1

2

我是Monifu的主要作者。

的observable 意味着它的 subscribe 函数为每个订阅者(每次subscribe()调用)启动一个新的数据源,而的observable 在多个订阅者之间共享同一个数据源。

例如,将文件视为数据源。让我们为一个简单的 Observable 建模,它从文件中发出行:

def fromFile(file: File): Observable[String] = {
  // this is the subscribe function that
  // we are passing to create ;-)
  Observable.create { subscriber =>
    // executing things on our thread-pool
    subscriber.scheduler.execute {
      val source = try {
        Observable.fromIterable(scala.io.Source
          .fromFile(file).getLines().toIterable)
      } 
      catch {
        // subscribe functions must be protected
        case NonFatal(ex) =>
          Observable.error(ex)
      }

      source.unsafeSubscribe(subscriber)
    }
  }
}

这个函数创建了一个冷的 observable。这意味着它将为每个订阅的观察者打开一个新的文件句柄,然后为每个订阅的观察者读取并发出行。

但是我们可以把它变成一个 hot observable:

// NOTE: publish() turns a cold observable into a hot one
val hotObservable = fromFile(file).publish()

然后不同的是当你这样做时:

val x = observable.subscribe()
val y = observable.subscribe()

如果 observable 很热:

  1. connect()在你调用它之前,observable 不会做任何事情
  2. 之后connect(),打开同一个文件,两者都会收到完全相同的事件
  3. 在发出该文件中的所有行之后,新订阅者将不会得到任何东西,因为(共享)数据源已经耗尽

如果 observable 是冷的:

  1. 在每次订阅时,都会打开并读取一个新的文件句柄
  2. 元素在 之后立即发出subscribe(),因此无需等待connect()
  3. 所有订阅的观察者都将收到该文件中的所有行,无论他们何时这样做

一些也适用于 Monifu 的参考资料:

  1. 来自 RxJava 的 wiki 的 Connectable Observable
  2. Rx 简介:冷热 Observables
  3. 来自 RxJava wiki 的主题
于 2015-07-30T12:13:53.523 回答