3

我最近开始玩rxjava-scala,我想创建一个(可能)无限流可观察。查看 github 上的代码和打开的问题,我发现“开箱即用”的解决方案尚未实现(问题中的 usecase06 说它甚至没有为 java 实现)

所以,我试图提出我自己的实现。考虑以下:

def getIterator: Iterator[String] = {
  def fib(a: BigInt, b: BigInt): Stream[BigInt] = a #:: fib(b, a + b)
  fib(1, 1).iterator.map{bi =>
    Thread.sleep(100)
    s"next fibonacci: ${bi}"
  }
}

和一个辅助方法:

def startOnThread(body: => Unit): Thread = {
  val t = new Thread {
    override def run = body
  }
  t.start
  t
}

和示例核心:

val observable: Observable[String] = Observable(
  observer => {
    var cancelled = false
    val fs = getIterator
    val t = startOnThread{
      while (!cancelled) {observer.onNext(fs.next)}
      observer.onCompleted()
    }
    Subscription(new rx.Subscription {
      override def unsubscribe() = {
        cancelled = true
        t.join
      }
    })
  }
)

val observer = Observer(new rx.Observer[String]{
  def onNext(args: String) = println(args)
  def onError(e: Throwable) = logger.error(e.getMessage)
  def onCompleted() = println("DONE!")
})

val subscription = observable.subscribe(observer)
Thread.sleep(5000)
subscription.unsubscribe()

这似乎工作正常,但我对此不满意。首先,我正在创建一个新的Thread,这可能很糟糕。但即使我使用某种线程池,它仍然会感觉不对。所以我想我应该使用调度程序,这听起来像是一个合适的解决方案,只是我不知道如何在这种情况下使用它。我尝试rx.lang.scala.concurrency.Schedulers.threadPoolForIO使用该observeOn方法进行补充,但似乎我做错了。observable 的代码不会用它编译。任何帮助将不胜感激。谢谢!

4

1 回答 1

1

首先,已经有将 Iterable 转换为 Observable 的适配器:“from”函数。

秒,迭代器不会返回控制,所以你的睡眠和取消订阅不会被调用。您需要在专用线程“subscribeOn(NewThreadScheduler())”中执行订阅操作

def getIterator: Iterator[String] = {
  def fib(a: BigInt, b: BigInt): Stream[BigInt] = a #:: fib(b, a + b)
  fib(1, 1).iterator.map{bi =>
    Thread.sleep(1000)
    s"next fibonacci: ${bi}"
  }
}

val sub = Observable.from(getIterator.toIterable)
  .subscribeOn(NewThreadScheduler())
  .subscribe(println(_))
readLine()
sub.unsubscribe()
println("fib complete")
readLine()
于 2014-04-23T21:55:10.097 回答