2

我需要在第一次调用时缓存请求的结果,然后为后续调用读取缓存值。

为了实现这个目标,我正在使用 Promise 并将它们链接起来。我有一个可行的解决方案,但我想将其转换为 RxJS 的 observables 而不是 Promises。

这是我的工作解决方案:

private currentPromise: Promise<{ [key: string]: any }>;
private cache: any;
public getSomething(name: string): Promise<number>{
  return this.currentPromise = !this.currentPromise ? 
    this._getSomething(name) : 
    new Promise((r) => this.currentPromise.then(() => this._getSomething(name).then((res) => r(res))));
}

private _getSomething(name: string): Promise<any> {
  return new Promise((resolve) => {
    if (this.cache[name]) {
      this.messages.push("Resolved from cache");
        resolve(this.cache[name]);
      } else {
        // Fake http call. I would use Angular's Http class.
        setTimeout(()=> {this.messages.push("Resolved from server"); this.cache[name] = name; resolve(this.cache[name]); }, 2000 );
      }
  });
}

this.getSomething("thing1").then((res)=>this.messages.push(res));
this.getSomething("thing1").then((res)=>this.messages.push(res));
this.getSomething("thing2").then((res)=>this.messages.push(res));
this.getSomething("thing2").then((res)=>this.messages.push(res));
this.getSomething("thing1").then((res)=>this.messages.push(res));
this.getSomething("thing2").then((res)=>this.messages.push(res));
this.getSomething("thing1").then((res)=>this.messages.push(res));
this.getSomething("thing2").then((res)=>this.messages.push(res));

你可以在这个 plunkr 上测试它:https ://plnkr.co/edit/j1pm2GeQf6oZwRvbUsXJ?p=preview

如何使用 RxJS 5 beta 实现相同的目标?

更新

根据 Bergi 的评论,我更新了我的 plunkr 和我的代码,使其更接近我的真实案例

4

1 回答 1

1

AsyncSubjects是 Promises 的 Rx 模拟。 publishLast是将 observable 转化为 observable 的最佳方式。像这样的东西应该工作:

private cache: {string: Rx.Observable<any>};

public getSomethings(names: string[]) : Rx.Observable<any> {
    // call getSomething for each entry in names
    // streams is an array of observables
    const streams = names.map(name => this.getSomething(name));

    // transform streams into an observable with an array of results
    return Observable.zip(streams);
}

public getSomething(name: string) : Rx.Observable<any> {
    if (!this.cache[name]) {
        // create the request observable
        // const request = Rx.Observable.ajax(...); // http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-ajax
        // for now, just wait 2 seconds and return name
        const request = Rx.Obsevable.of(name).delay(2000);

        // use "do" to log whenever this raw request produces data
        const loggedRequest = request.do(v => this.messages.push("retrieved from server " + v));

        // create an observable that caches the result
        // in an asyncSubject
        const cachedRequest = loggedRequest.publishLast();

        // store this in our cache object
        this.cache[name] = cachedRequest;
    }

    // return the cached async subject
    return this.cache[name];
}


// usage
this.getSomething("thing1").subscribe(v => this.messages.push("received " + v));
this.getSomething("thing1").subscribe(v => this.messages.push("received " + v));
this.getSomething("thing1").subscribe(v => this.messages.push("received " + v));
this.getSomething("thing1").subscribe(v => this.messages.push("received " + v));
于 2016-04-20T16:27:05.947 回答