我最近开始玩rxjava-scala,我想创建一个(可能)无限流可观察。查看 github 上的代码和打开的问题,我发现“开箱即用”的解决方案尚未实现(问题中的 usecase06 说它甚至没有为 java 实现)。
所以,我试图提出我自己的实现。考虑以下:
def getIterator: Iterator[String] = {
def fib(a: BigInt, b: BigInt): Stream[BigInt] = a #:: fib(b, a + b)
fib(1, 1).iterator.map{bi =>
Thread.sleep(100)
s"next fibonacci: ${bi}"
}
}
和一个辅助方法:
def startOnThread(body: => Unit): Thread = {
val t = new Thread {
override def run = body
}
t.start
t
}
和示例核心:
val observable: Observable[String] = Observable(
observer => {
var cancelled = false
val fs = getIterator
val t = startOnThread{
while (!cancelled) {observer.onNext(fs.next)}
observer.onCompleted()
}
Subscription(new rx.Subscription {
override def unsubscribe() = {
cancelled = true
t.join
}
})
}
)
val observer = Observer(new rx.Observer[String]{
def onNext(args: String) = println(args)
def onError(e: Throwable) = logger.error(e.getMessage)
def onCompleted() = println("DONE!")
})
val subscription = observable.subscribe(observer)
Thread.sleep(5000)
subscription.unsubscribe()
这似乎工作正常,但我对此不满意。首先,我正在创建一个新的Thread
,这可能很糟糕。但即使我使用某种线程池,它仍然会感觉不对。所以我想我应该使用调度程序,这听起来像是一个合适的解决方案,只是我不知道如何在这种情况下使用它。我尝试rx.lang.scala.concurrency.Schedulers.threadPoolForIO
使用该observeOn
方法进行补充,但似乎我做错了。observable 的代码不会用它编译。任何帮助将不胜感激。谢谢!