1

我尝试

val executors = Executors.newSingleThreadExecutor()
val scheduler = Schedulers.from(executors)
Observable.just[Int](1,2,3).subscribeOn(scheduler)

输出错误

Error:(103, 43) type mismatch;
found   : rx.Scheduler
required: rx.lang.scala.Scheduler
Observable.just[Int](1,2,3).subscribeOn(scheduler)
                                      ^ 

如何在中使用自定义调度程序RxScala

4

1 回答 1

2

这里的问题是你在这里混合了 RxJava 和 RxScala 代码。你看,RxScala 只是 RxJava 功能的一个包装器;前者只转发给后者,并且没有任何“真正的”实现。这很有用,因为您只需要维护 1 个版本而不是 2 个或更多。

scheduler您的示例中的类型是rx.Scheduler,因此它是 RxJava Scheduler。但是,subscribeOn需要您提供rx.lang.scala.Scheduler一个 RxScala Scheduler。因此,您需要将 RxJava 转换为 RxScalaScheduler中的一个。

但是,对于您的情况,有一种更好的处理方式:使用工厂方法将您包装Executors.newSingleThreadExecutor到 a中。然后把它包装成一个,你就有了可以在. 你的代码看起来像这样(我包含了一个打印语句来查看这些东西在哪个线程上运行):scala.concurrent.ExecutionContextfromExecutorrx.lang.scala.schedulers.ExecutionContextSchedulersubscribeOn

val e = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
val s = ExecutionContextScheduler(e)

Observable.just(1, 2, 3)
    .subscribeOn(s)
    .doOnNext(x => println(s"thread - ${Thread.currentThread().getName}, value - $x"))
    .subscribe()
于 2016-03-28T14:00:34.030 回答