我想知道是否有一种方便的方法来检查 observable 是否已完成。例如我有一个测试
test("An observable that tracks another observable is completed")
{
val sub = PublishSubject[Boolean](false)
val newOb = sub recovered // This methods returns an Observable[Try[T]]
val res = scala.collection.mutable.ListBuffer[Try[Boolean]]()
val cr = newOb subscribe( v => res += v, t => assert( false, "There shouldn't be an exception" ), () => println("Stream Completed") )
sub.onNext(true)
sub.onNext(false)
sub.onNext(true)
sub.onCompleted
assert( res.toList === List(Success(true), Success(false), Success(true) ))
newOb.isEmpty subscribe { v => assert( v == true, "Stream should be completed" ) }
}
该recovered
方法返回一个 Observable[Try[T]] 并且是标准 Observable 的扩展。我想在源 Observable 完成时检查 Observable[Try[T]] 是否完成。
因此,我编写了一个带有主题的测试,我向其中发布了一些值,然后最终完成。有没有一种简单的方法可以检查 newOb 是否也已完成?Observable 中没有类似 isCompleted 的方法。