3

我想知道是否有一种方便的方法来检查 observable 是否已完成。例如我有一个测试

test("An observable that tracks another observable is completed")
{
      val sub = PublishSubject[Boolean](false)
      val newOb = sub recovered // This methods returns an Observable[Try[T]] 


      val res = scala.collection.mutable.ListBuffer[Try[Boolean]]()
      val cr = newOb subscribe( v => res += v, t => assert( false, "There shouldn't be an exception" ), () => println("Stream Completed") )
      sub.onNext(true)
      sub.onNext(false)
      sub.onNext(true)
      sub.onCompleted
      assert( res.toList === List(Success(true), Success(false), Success(true) ))
      newOb.isEmpty subscribe { v => assert( v == true, "Stream should be completed" ) }
  }

recovered方法返回一个 Observable[Try[T]] 并且是标准 Observable 的扩展。我想在源 Observable 完成时检查 Observable[Try[T]] 是否完成。

因此,我编写了一个带有主题的测试,我向其中发布了一些值,然后最终完成。有没有一种简单的方法可以检查 newOb 是否也已完成?Observable 中没有类似 isCompleted 的方法。

4

1 回答 1

2

这就是观察者模式的本质,当有调用onCompleted时,触发相应的handler,只有观察者完成才能理解。但是我听说如果 Observer 已经完成并附加到处理程序,它会立即工作,但我认为它已经在 asJavaObserver 的较低级别实现。

该链接可能会有所帮助: Netflix RxJava

于 2014-01-04T13:16:24.887 回答