1

我在 Scala 项目中使用 RxJava 并说我有这个简单的Observable

Observable[String](observer => while (true) observer onNext "hi")
  .subscribe(v => println(v))

println("hello")

我永远不会收到“你好”消息,因为while阻塞了一个线程。如何Observable在单独的线程中运行以避免阻塞?

====================================

我在想observeOn可能会有所帮助,但事实并非如此。运行这个:

val s = rx.lang.scala.schedulers.NewThreadScheduler.apply

Observable[String](observer => while (true) observer onNext "hi")
  .observeOn(s).subscribe(v => println(v))

println("hello")

...仍然不打印“你好”。我猜想添加observeOnmakeOnNext在单独的线程中调用而不是while块本身?

====================================

我当然可以换成while一个Future

Observable[String](observer => Future { while (true) observer onNext "hi" })
  .subscribe(v => println(v))

println("test") // "test" gets printed

但也许存在更多的 rx 惯用方式来做到这一点?

4

1 回答 1

4

你需要使用subscribeOn. 例如,

Observable[String](observer => while (true) observer onNext "hi").subscribeOn(s)
  .subscribe(v => println(v))

subscribeOn将调用 中的subscribe函数Scheduler,同时observeOn将消息发送到Scheduler.

于 2014-06-05T03:18:57.883 回答