15

我有两个行为主题流,我试图 forkJoin 没有运气。正如我想象的那样,它返回了它的最后两个值。这有可能以某种方式实现吗?

它不是在主题之后调用的。

let stream1 = new BehaviorSubject(2);
let stream2 = new BehaviorSubject('two');

Observable.forkJoin(stream1, stream2)
    .subscribe(r => {
         console.log(r);
    });
4

3 回答 3

22

请注意forkJoin()其文档中的实际操作:

等待 Observables 完成,然后组合它们发出的最后一个值。

这意味着当所有输入 Observable都完成forkJoin()时会发出一个值。当使用这意味着显式调用它们时:BehaviorSubjectcomplete()

import { Observable, BehaviorSubject, forkJoin } from 'rxjs';

const stream1 = new BehaviorSubject(2);
const stream2 = new BehaviorSubject('two');

forkJoin(stream1, stream2)
  .subscribe(r => {
    console.log(r);
  });

stream1.complete();
stream2.complete();

查看现场演示:https ://stackblitz.com/edit/rxjs-9nqtx6

2019 年 3 月:针对 RxJS 6 进行了更新。

于 2016-09-27T14:18:12.250 回答
20

如果您不想(或不知道何时)调用complete(),可以使用combineLatest代替forkJoin

使用combineLatest,当任何可观察的源(在您的情况下,您的行为主体)发出一个值时,combineLatest将触发:

const stream1 = new BehaviorSubject(2);
const stream2 = new BehaviorSubject('two');

combineLatest(stream1, stream2)
    .subscribe(r => {
         console.log(r);
    });

stream1.next(3);
stream2.next('three');

控制台日志:

(2) [2, "two"] // 初始状态

(2) [3, "two"] // 下一个在 stream1 上触发

(2) [3, "three"] // 下一个在 stream2 上触发

现场演示:https ://stackblitz.com/edit/rxjs-qzxo3n

于 2020-02-27T12:12:18.970 回答
3

您可以使用上面提到的take(1)管道或方法。complete()

private subjectStream1 = new BehaviorSubject(null);
stream1$: Observable = this.subjectStream1.asObservable();

private subjectStream2 = new BehaviorSubject(null);
stream2$: Observable = this.subjectStream2.asObservable();

forkJoin({
  stream1: this.stream1$.pipe(take(1)),
  stream2: this.stream2$.pipe(take(1))
})
.pipe(takeUntil(this._destroyed$))
.subscribe(values) => console.log(values));
于 2021-05-13T10:15:32.303 回答