combineLatest运算符返回一个 Observable,当所有作为参数传入的 observable 完成时,该 ObservablecombineLatest完成。
有没有办法创建一个 Observable,它的行为与返回的 ObservablecombineLatest相同,唯一的区别是它在第一个作为参数传递的 Observable 完成时完成?
combineLatest运算符返回一个 Observable,当所有作为参数传入的 observable 完成时,该 ObservablecombineLatest完成。
有没有办法创建一个 Observable,它的行为与返回的 ObservablecombineLatest相同,唯一的区别是它在第一个作为参数传递的 Observable 完成时完成?
是的,你可以;你可以这样做:
function combineLatestUntilFirstComplete(...args) {
const shared = args.map(a => a.share());
return Rx.Observable
.combineLatest(...shared)
.takeUntil(Rx.Observable.merge(...shared.map(s => s.last())));
}
const a = Rx.Observable.interval(100).map(index => `a${index}`).take(5);
const b = Rx.Observable.interval(200).map(index => `b${index}`).take(5);
combineLatestUntilFirstComplete(a, b).subscribe(
value => console.log(JSON.stringify(value)),
error => console.error(error),
() => console.log("complete")
);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
该实现从内部调用返回的可观察对象中获取值,combineLatest直到其中一个源可观察对象发出其最后一个值。
请注意,源 observables 是共享的,因此由于takeUntil调用而产生的订阅不会影响对冷源 observables 的二次订阅。
const a = Rx.Observable.interval(400).share();
const b = Rx.Observable.interval(600).share();
const c = Rx.Observable.interval(1000).take(3).share();
const combined = Rx.Observable.combineLatest(a, b, c)
.takeUntil(
Rx.Observable.merge(
a.ignoreElements().concat(Rx.Observable.of('fin-a')).do(console.log),
b.ignoreElements().concat(Rx.Observable.of('fin-b')).do(console.log),
c.ignoreElements().concat(Rx.Observable.of('fin-c')).do(console.log)
)
);
combined
.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.js"></script>
您需要.share()输入可观察对象,因为您需要它们两次,一次用于.combineLatest(),一次用于.takeUntil完成您的可观察流。
我使用.ignoreElements() in.takeUntil忽略任何值,并且仅当源流(或a、b或c).concat向它完成“最终”消息以向 发出信号以.takeUntil完成我们对.combineLatest. a,b,c如果其中一个不发出任何值,这也有效。