0

我尝试使用循环中发生的多重Observer订阅。ObservableonNext似乎不适用于每个观察者。

import rx.lang.scala.Observable

object SubscribeMultiEvent extends App{
  val obv = Observable.apply[String]{ s =>
    def printForever: Unit = {
      s.onNext("hi~")
      Thread.sleep(1000)
      printForever
    }
    printForever
  }

  obv.subscribe(s => println(s"first observer - $s"))
  obv.subscribe(s => println(s"second observer - $s"))

  Thread.currentThread().join()
}

仅针对第一个观察者的响应

first observer - hi~
first observer - hi~
...

为什么第二个收不到订阅?谢谢

4

1 回答 1

1

您的代码中的问题是您的代码Observable是同步的。这意味着第二个在第一个完成subscribe之前不会运行。subscribe而且由于您Observable的从未完成,因此第二个subscribe将无法运行。

要解决此问题,您需要使您的Observable异步。您可以使用subscribeOn在另一个线程中运行。例如,

import rx.lang.scala.Observable
import rx.lang.scala.schedulers.NewThreadScheduler

object SubscribeMultiEvent extends App{
  val obv = Observable.apply[String]{ s =>
    def printForever: Unit = {
      s.onNext("hi~")
      Thread.sleep(1000)
      printForever
    }
    printForever
  }.subscribeOn(NewThreadScheduler())

  obv.subscribe(s => println(s"first observer - $s"))
  obv.subscribe(s => println(s"second observer - $s"))

  Thread.sleep(60000)
}

Thread.sleep(60000)最后很重要。RxJava 的 Threads 默认是 daemon,如果主线程结束,因为没有非 daemon 线程,JVM 会退出。为了防止主线程停止,您需要添加类似Thread.sleep(60000).

于 2016-03-21T17:25:49.587 回答