为什么会发生这种情况
在订阅的过程中,观察者通过Subscribe
通知表示它准备好接收可观察的项目。详见Observable
合同。
此外,Subject
文档指出:
请注意, aPublishSubject
可能会在创建后立即开始发出项目(除非您已采取措施防止这种情况发生),因此在创建和观察者订阅它之间存在一个或多个项目可能丢失的风险Subject
。
当您尝试通过 订阅新线程后subject.onNext(_)
立即.subscribeOn(Schedulers.computation())
调用时,可观察对象(即subject
)可能仍在等待Subscribe
观察者的通知。例如:
subject
.subscribeOn(Schedulers.computation())
.subscribe { println("received item") }
// this usually prints nothing!
subject.onNext(1)
但是,如果您在发出第一个项目之前添加一点时间延迟,则可观察对象更有可能Subscribe
在您调用之前收到观察者的通知subject.onNext(_)
。例如:
subject
.subscribeOn(Schedulers.computation())
.subscribe { println("received item") }
// wait for subscription to be established properly
Thread.sleep(1000)
// this usually prints "received item"
subject.onNext(1)
该怎么办?
如果您希望您的所有订阅都接收可观察对象发出的所有项目,您可以执行以下操作之一:
- 在调用之前阻塞主线程以等待所有观察者都订阅
subject.onNext(_)
。
- 创建一个新的 observable,它会等到所有 observable 都被订阅,然后再
subject.onNext(_)
在其内部调用。
这些也可能有用:
ReplaySubject
:这允许您存储所有以前项目的历史记录,并在每次订阅时重新发送它们。缺点:您需要在内存中存储任意数量的项目。
ConnectableObservable
:这确保了 observable 仅在.connect()
调用后才发出项目。特别是,.autoConnect(n)
操作员确保 observable 仅在n
观察者成功订阅后才发出。
示例:阻塞主线程直到订阅
val subject: PublishSubject<Int> = PublishSubject.create()
val countDownLatch = CountDownLatch(1)
val isSubscribedLatch = CountDownLatch(1)
subject
.subscribeOn(Schedulers.computation())
.doOnSubscribe { isSubscribedLatch.countDown() }
.map { it + 1 }
.subscribe {
countDownLatch.countDown()
println(Thread.currentThread().name)
}
isSubscribedLatch.await()
subject.onNext(1)
countDownLatch.await()