1

我有一个方法,getObs()它返回一个 observable,它应该由所有调用者共享。但是,当有人调用时,该 observable 可能不存在getObs(),并且创建它是一个异步操作,所以我的想法是返回一个占位符 observable,一旦创建,它就会被真正的 observable 替换。

我的基本尝试是这样的:

var createSubject = new Rx.Subject();
var placeholder = createSubject.switchLatest();

placeholder如果调用 'getObs()' 时真正的 observable 不存在,我可以返回。当创建真正的 observable 时,我使用createSubject.onNext(realObservable),然后将其传递给switchLatest()为任何订阅者解包它。

但是,为此目的使用 Subject 和 switchLatest 似乎有点矫枉过正,所以我想知道是否有更直接的解决方案?

4

2 回答 2

2

如果获取 observable 本身的行为是异步的,那么您也应该将其建模为 observable。

例如...

var getObsAsync = function () {
    return Rx.Observable.create(function (observer) {
        var token = startSomeAsyncAction(function (result) {
                // the async action has completed!
                var obs = Rx.Observable.fromArray(result.dataArray);
                token = undefined;
                observer.OnNext(obs);
                observer.OnCompleted();
            }),
            unsubscribeAction = function () {
                if (asyncAction) {
                    stopSomeAsyncAction(token);
                }
            };            

        return unsubscribeAction;
    });
};

var getObs = function () { return getObsAsync().switchLatest(); };

如果您想共享该 observable 的单个实例,但不希望在有人实际订阅之前获得 observable ,那么您可以:

// source must be a Connectable Observable (ie the result of Publish or Replay)
// will connect the observable the first time an observer subscribes
// If an action is supplied, then it will call the action with a disposable
// that can be used to disconnect the observable.
// idea taken from Rxx project
Rx.Observable.prototype.prime = function (action) {
    var source = this;
    if (!(source instanceof Rx.Observable) || !source.connect) {
        throw new Error("source must be a connectable observable");
    }

    var connection = undefined;
    return Rx.Observable.createWithDisposable(function (observer) {
        var subscription = source.subscribe(observer);

        if (!connection) {
            // this is the first observer.  Connect the underlying observable.
            connection = source.connect();
            if (action) {
                // Call action with a disposable that will disconnect and reset our state
                var disconnect = function() {
                    connection.dispose();
                    connection = undefined;
                };
                action(Rx.Disposable.create(disconnect));
            }
        }

        return subscription;
    });
};

var globalObs = Rx.Observable.defer(getObs).publish().prime();

现在在任何地方都可以使用 globalObs 而不必担心它:

// location 1
globalObs.subscribe(...);

// location 2
globalObs.select(...)...subscribe(...);

请注意,实际上没有人需要调用getObs,因为您只需设置一个全局可观察对象,当有人订阅时(通过defer)调用您。getObs

于 2013-07-09T16:51:32.203 回答
1

您可以在事后使用主题来连接源:

var placeholder = new Subject<YourType>();
// other code can now subscribe to placeholder, best expose it as IObservable

创建源时:

var asyncCreatedObs = new ...;
placeholder.Subscribe(asyncCreatedObs);
// subscribers of placeholder start to see asyncCreatedObs 
于 2013-07-09T14:33:22.850 回答