4

有没有办法确保订阅者更新的顺序得到保证?

我有一个 hot observable,我的第一个订阅者做了一些同步工作来更新一个变量,然后我的下一个订阅者必须初始化一个服务(只有一次!),并且只有在确保设置了该变量之后!

它看起来像这样:

import App from './App'

var appSource = App.init() // gets the hot observable

// our second subscriber
appSource.take(1).subscribe(() => {
  // take 1 to only run this once
  nextService.init()
})

App.init看起来像这样:

...
init() {
  var source = this.createObservable() // returns a hot interval observable that fetches a resource every few minutes

  // first subscriber, updates the `myVar` every few minutes
  source.subscribe((data) => this.myVar = data)

  return source
}
...

这目前有效,但我不确定它是否会始终遵循 100% 的顺序。

编辑:

正如我所听说的,订阅者将被调用 FIFO。所以顺序有些保证。

4

1 回答 1

2

我不知道 RxJS 是否曾明确保证按订阅顺序调用观察者。但是,正如您所说,它通常有效。

但是,您可能会考虑对实际工作流程进行建模,而不是依赖隐式观察者顺序。

听起来您需要知道您的应用程序何时初始化,以便您可以采取进一步的行动。与其依赖于内部工作原理的知识,不如App.init为此App公开一个 API:

一种(非 Rx 方式)是让调用者提供回调init

//...
init(callback) {
  var source = this.createObservable() // returns a hot interval observable that fetches a resource every few minutes

  // first subscriber, updates the `myVar` every few minutes
  source.subscribe((data) => {
    this.myVar = data;
    if (callback) {
        callback();
        callback = undefined;
    }
  })

  return source
}

// elsewhere
App.init(() => nextService.init());

另一个选项而不是回调是在初始化完成后只init返回Promise你的解析(或你发出的信号)。Rx.AsyncSubject

还有另一种选择,但需要进行一些重构,是将其建模this.myVar为可观察数据。IE:

init() {
    this.myVar = this.createObservable().replay(1);
    this.myVar.connect();
    // returns an observable that signals when we are initialized
    return this.myVar.first();
}

// elsewhere, you end up with this pattern...
const servicesToInit = [ App, service1, service2, service3 ];
Observable
    .of(servicesToInit)
    .concatMap(s => Rx.Observable.defer(() => s.init()))
    .toArray()
    .subscribe(results => {
        // all initializations complete
        // results is an array containing the value returned by each service's init observable
    });

现在,任何想要使用的东西myVar总是需要以某种方式订阅它以获得当前和/或未来的值。他们永远不能只是同步地询问当前值。

于 2015-07-29T14:48:03.810 回答