这里的问题是你在这里混合了 RxJava 和 RxScala 代码。你看,RxScala 只是 RxJava 功能的一个包装器;前者只转发给后者,并且没有任何“真正的”实现。这很有用,因为您只需要维护 1 个版本而不是 2 个或更多。
scheduler
您的示例中的类型是rx.Scheduler
,因此它是 RxJava Scheduler
。但是,subscribeOn
需要您提供rx.lang.scala.Scheduler
一个 RxScala Scheduler
。因此,您需要将 RxJava 转换为 RxScalaScheduler
中的一个。
但是,对于您的情况,有一种更好的处理方式:使用工厂方法将您包装Executors.newSingleThreadExecutor
到 a中。然后把它包装成一个,你就有了可以在. 你的代码看起来像这样(我包含了一个打印语句来查看这些东西在哪个线程上运行):scala.concurrent.ExecutionContext
fromExecutor
rx.lang.scala.schedulers.ExecutionContextScheduler
subscribeOn
val e = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
val s = ExecutionContextScheduler(e)
Observable.just(1, 2, 3)
.subscribeOn(s)
.doOnNext(x => println(s"thread - ${Thread.currentThread().getName}, value - $x"))
.subscribe()