首先,一些背景(也许有更好的方法):
我们有一个模块可以在特定的 Observable 上发出传入的蓝牙消息。然后我们处理这些消息,最后在最后订阅以发送消息。这种处理可能会在某个时候发生变化,这对于大多数处理意味着重新创建中间 Observables,以及依赖它的所有 observables(因为它们现在将处理无效数据)。
我们想改变它,以便重新创建处理的某些部分不需要重新创建依赖它的所有内容,主要是因为我们不必一直记住什么依赖于什么,并且还让具有内部状态的操作符(如缓冲、扫描或去抖动)不会丢失这个内部状态。
有希望的解决方案:
通过使用 switchOnNext 操作符,我们可以解决这个问题。每当重新创建中间 observable 时,我们只需将其添加到 switchOnNext 的原点,订阅 switchOnNext 输出的任何人都会立即获得新结果。
问题:
如果 switchOnNext 之后的处理必须改变,它将停止获取结果,直到之前的 observable 改变。这意味着我们现在遇到了相反的问题。每当某些部分发生变化时,我们必须递归地重新创建它所依赖的所有内容。这稍微好一点(跟踪依赖的东西比跟踪依赖它的所有东西要容易得多),但是 observables 仍然会丢失内部状态,因为它们必须重新创建。
这种行为似乎违反了文档所说的应该发生的事情,但它并没有明确说明一种或另一种方式。
示例代码:
此代码演示了该问题。
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
fun main() {
//Observable of observables
val publishSubject: PublishSubject<Observable<Int>> = PublishSubject.create()
//Observable to subscribe to get the most recent values
val observable: Observable<Int> = Observable.switchOnNext(publishSubject)
observable.subscribe { println("1: $it") }
//Now 1 is subscribed
val obsAux1 = PublishSubject.create<Int>()
observable.subscribe { println("2: $it") }
//Now 1 and 2 are subscribed
publishSubject.onNext(obsAux1)
observable.subscribe { println("3: $it") }
//Now 1, 2 and 3 are subscribed
//Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
obsAux1.onNext(1)
val obsAux2 = PublishSubject.create<Int>()
publishSubject.onNext(obsAux2)
observable.subscribe { println("4: $it") }
//Now 1, 2, 3 and 4 are subscribed
//Should not print anything
obsAux1.onNext(2)
//Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
obsAux2.onNext(3)
}
此代码的输出:
1: 1
2: 1
1: 3
2: 3
3: 3
预期输出:
1: 1
2: 1
3: 1 <--- This is missing
1: 3
2: 3
3: 3
4: 3 <--- This is missing
第一次 obsAux1 发出时,所有三个订阅都应该打印出来,但只有在它被添加到 publishSubject 之前的那些会打印出来。
第二次 obsAux1 发出时,不应打印任何内容,因为 obsAux2 已经插入。这按预期工作
obsAux2 第一次发出时,所有四个订阅都应该打印。第三个订阅按预期打印,这应该订阅工作正常。但是第四个订阅没有打印任何内容,因为它是在将 obsAux2 插入到 publishSubject 之后添加的。