1

我试图通过超时限制可观察的寿命:

def doLongOperation() = {
   Thread.sleep(duration)
   "OK"
}

def firstStep = Observable.create(
  (observer: Observer[String]) => {
    observer.onNext(doLongOperation())
    observer.onCompleted()
    Subscription()
  }
)

firstStep
  .timeout(1 second)
  .subscribe(
    item => println(item),
    throwable => throw throwable,
    () => println("complete")
  ) 

我想区分以下结果:

  1. Observable 超时结束,没有得到结果
  2. 执行过程中抛出异常
  3. 执行成功,返回值

我可以在部分 onNext 和 onError 中毫无问题地处理案例 2 和 3,但是如何检测 observable 是否因超时完成?

还有一件事:尽管我的代码中有对 obeserver.onCompleted() 的调用,但我从来没有进入 block onComplete。为什么?

4

2 回答 2

1

如果发生超时,TimeoutException则会在计算线程上发出 a throw throwableis 最终被忽略,并且您的主线程不会也看不到它。您可以toBlocking在超时后添加,这样任何异常都会在同一个线程上结束:

firstStep
  .timeout(1 second)
  .toBlocking()
  .subscribe(
    item => println(item),
    throwable => println(throwable),
    () => println("complete")

)

于 2015-11-12T16:04:36.340 回答
0

TimeoutException 确实被抛出。该问题是由使用错误的库引起的。我的依赖项中有“com.netflix.rxjava”,而不是“io.reactivex”

于 2015-11-12T16:18:34.227 回答