2

我正在学习 RxScala 并来到这个非常合成的片段。我正在尝试处理 onError 块中的异常:

def doLongOperation():String = {
  (1 to 10).foreach {
    _ =>
      Thread.sleep(100)
      print(".")
  }
  println()
  if (System.currentTimeMillis() % 2 == 0) {
    throw new RuntimeException("Something went wrong during long operation")
  }
  s"OK"
}

def main(args: Array[String]) {
  println("Changing status:  -> doing task1")
  Observable.just(
    doLongOperation()
  ).subscribe(
    str => println("Changing status: doing task1 -> doing task2"),
    throwable => println(s"Failed: ${throwable.getMessage}"),     //never get here
    () => println("Completed part")
  )
}

万一出现异常,我希望是这样的:

Failed: Something went wrong during long operation

但我得到的是:

.........Exception in thread "main" java.lang.RuntimeException: Something went wrong during long operation
at stats.STest$.doLongOperation(STest.scala:20)
at stats.STest$.main(STest.scala:49)
at stats.STest.main(STest.scala)

我错过了什么?我应该在观察者处“手动”调用 onError 吗?感谢任何帮助。

4

2 回答 2

1

Observable.just不能很好地处理异常情况,不确定这是错误还是预期的行为。您可以尝试这种方式:

Observable.create[String]( o => {
  o.onNext(doLongOperation())
  o.onCompleted()
  Subscription{}
}).subscribe(
  str => println("Changing status: doing task1 -> doing task2"),
  throwable => println(s"Failed: ${throwable.getMessage}"),    here
  () => println("Completed part")
)
于 2015-11-05T22:32:28.760 回答
1

问题是对just() 的误解。它在组装序列时采用现有值,而不是在订阅者订阅时执行的方法。换句话说,您的代码执行以下操作:

var tempValue = doLongOperation();

Observable.just(tempValue).subscribe(...)

甚至在创建 Observable 之前抛出。

(抱歉,我不太了解 Scala 或 RxScala,所以请原谅我的 Java 8 示例。)

我不知道 RxScala 落后于 RxJava 多远,但是 RxJava 1.0.15 有一个新的工厂方法,fromCallable它可以让你延迟一个值:

Observable.fromCallable(() -> doLongOperation()).subscribe(...)

另一种方法是将您的原始源代码包装到 a 中defer,以便在doLongOperation抛出时将其路由到订阅者:

Observable.defer(() -> Observable.just(doLongOperation())).subscribe(...)
于 2015-11-05T22:49:53.793 回答