3

假设您有以下Observable内容rxjava-scala-0.18.4

@volatile var dorun = true
var subscriber: Subscriber[String] = null
val myObs = Observable { obs: Subscriber[String] =>
  subscriber = obs
  Subscription { println("unsubscribed"); dorun = false }
}

val sub = myObs.head.subscribe(println(_))

assertTrue(dorun)

subscriber.onNext("hello")
Thread.sleep(500)
assertFalse(dorun)

subscriber.onNext("world")
Thread.sleep(500)
assertFalse(dorun)

第二个断言失败,这意味着head没有取消订阅。我对 Observables 的理解是错误的还是应该head在第一个元素发出后取消订阅?

4

1 回答 1

3

看看你的subscribe()方法:你循环直到run设置为false,但发生这种情况的唯一方法是关闭订阅。问题是还没有人订阅: 循环让你无法返回。在第head一个项目交付后,运营商无法终止基础订阅,因为它还没有完成订阅。因此,您只是永远循环。

一种解决方案是将您的循环移动到安排在Schedulers.trampoline(). 然后事件将在从subscribe().

此外,在您的subscribe()方法中,您似乎需要将新的订阅对象添加到Subscriber传入的对象中,如下所示:

val myObs = Observable {
    obs: rx.lang.scala.Subscriber[String] =>
        ...

        obs.add(
            Subscription {
                dorun = false
                println("unsubscribed")
            }
        )
}
于 2014-06-09T17:35:14.200 回答