7

我正在构建一个 Angular2 应用程序,并且有两个BehaviourSubjects我想在逻辑上组合成一个订阅。我正在发出两个 http 请求,并希望在它们都回来时触发一个事件。我在看forkJoinvs combineLatest。似乎 combineLatest 会在任何一个 behvaviorSubjects 更新时触发,而 forkJoin 只会在所有 behavoirSubjects 都更新后触发。这个对吗?必须有一个普遍接受的模式,不是吗?

编辑
这是我的 angular2 组件订阅的一个behaviorSubjects 示例:

export class CpmService {

    public cpmSubject: BehaviorSubject<Cpm[]>;

    constructor(private _http: Http) {
        this.cpmSubject = new BehaviorSubject<Cpm[]>(new Array<Cpm>());
    }

    getCpm(id: number): void {
        let params: URLSearchParams = new URLSearchParams();
        params.set('Id', id.toString());

        this._http.get('a/Url/Here', { search: params })
            .map(response => <Cpm>response.json())
            .subscribe(_cpm => {
                this.cpmSubject.subscribe(cpmList => {
                    //double check we dont already have the cpm in the observable, if we dont have it, push it and call next to propigate new cpmlist everywheres
                    if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
                        cpmList.push(_cpm);
                        this.cpmSubject.next(cpmList);
                    }
                })
            });
    }
}

这是我的组件订阅的片段:

  this._cpmService.cpmSubject.subscribe(cpmList => {
      doSomeWork();
  });

但是,我不想在单个订阅上触发 doSomeWork(),而是只想在 cpmSubject 和 fooSubject 触发时触发 doSomeWork()。

4

1 回答 1

14

您可以使用zip-operator,其工作方式类似于 combineLatest 或 forkJoin,但仅在两个流都发出时触发:http ://reactivex.io/documentation/operators/zip.html

zip和之间的区别combineLatest是: Zip 只会触发“并行”,而combineLatest将触发任何更新并发出每个流的最新值。因此,假设以下 2 个流:

streamA => 1--2--3
streamB => 10-20-30

zip

  • “1、10”
  • “2、20”
  • “3、30”

combineLatest

  • “1、10”
  • “2、10”
  • “2、20”
  • “3、20”
  • “3、30”

这也是一个活生生的例子:

const a = new Rx.Subject();
const b = new Rx.Subject();

Rx.Observable.zip(a,b)
  .subscribe(x => console.log("zip: " + x.join(", ")));
Rx.Observable.combineLatest(a,b)
  .subscribe(x => console.log("combineLatest: " + x.join(", ")));

a.next(1);
b.next(10);
a.next(2);
b.next(20);
a.next(3);
b.next(30);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>


另一个旁注:永远不要在订阅内订阅。改为执行以下操作:

this._http.get('a/Url/Here', { search: params })
            .map(response => <Cpm>response.json())
            .withLatestFrom(this.cpmSubject)
            .subscribe([_cpm, cpmList] => {
                if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
                    cpmList.push(_cpm);
                    this.cpmSubject.next(cpmList);
                }
            });
于 2017-03-16T17:45:47.747 回答