2

在对一组冷的 observables 使用 combineLatest 时,我看到了一些意想不到的结果。它从除最后一个 Observable 之外的所有 Observable 中发出最新的,而是将第一个 (n-1) 个 Observable 中的最新与来自第 n 个 Observable 的每个元素相结合。

let observable = ReplaySubject<Int>.createUnbounded()
let observable2 = ReplaySubject<String>.createUnbounded()

observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)

observable2.onNext("bed")
observable2.onNext("book")
observable2.onNext("table")

let latestObserver = Observable.combineLatest(observable, observable2)

_ = latestObserver
    .subscribe(onNext: {
    print($0)
})
.disposed(by: disposeBag)

产生输出: (4, "bed") (4, "book") (4, "table")

我原本预计会看到(4,“table”)的输出。

如果我像这样更改可观察对象的顺序:

let latestObserver = Observable.combineLatest(observable2, observable)

我得到输出: ("table", 1) ("table", 2) ("table", 3) ("table", 4)

如果我添加一个最终的任意 Observable,那么我只会看到每个第一个的最新的:

let observable = ReplaySubject<Int>.createUnbounded()
let observable2 = ReplaySubject<String>.createUnbounded()
let observable3 = Observable<Int>.just(42)

observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)

observable2.onNext("bed")
observable2.onNext("book")
observable2.onNext("table")

let latestObserver = Observable.combineLatest(observable, observable2, observable3)

_ = latestObserver
    .subscribe(onNext: {
    print($0)
})
.disposed(by: disposeBag)

产生输出: (4, "table", 42)

这真的是预期的行为吗?

4

1 回答 1

1

让我们分解你的第一个例子中发生的事情......

您可以使用Observable.from代替主题并获得相同的结果...代码中的步骤如下,

  1. 创建两个ReplaySubjects 并用事件加载它们。
  2. combineLatest操作员订阅第一个主题。
  3. 第一个主题立即重播其所有价值观。
  4. 由于第二个主题还没有被订阅,combineLatest操作者不会发出任何东西,而是默默地吸收值,同时总是存储“最新”的。
  5. 然后combineLatest操作员订阅第二个主题。
  6. 该主题重播所有值。
  7. 由于combineLatest操作员现在已经从它的每个源接收到下一个事件,它发出从第二个源发出的值,并结合来自第一个源的最新(即最后一个)。

您的重播主题本质上是同步的。他们在订阅后立即将所有值发送给订阅者。

在您的最后一个示例代码中,由于三个中的最后一个 observable 只发出一个值,并且只有在其他两个发出所有值之后,您才会看到一个包含前两个 observable 的最新输出的输出。

于 2020-11-02T03:30:19.397 回答