我正在使用 RxJS 6 创建可管道操作符complete()
,并且不清楚当操作是异步时如何观察观察者。
对于同步操作,逻辑很简单。在下面的示例中,来自源的所有值Observable
都将传递给observer.next()
,然后observer.complete()
被调用。
const syncOp = () => (source) =>
new rxjs.Observable(observer => {
return source.subscribe({
next: (x) => observer.next(x),
error: (e) => observer.error(err),
complete: () => observer.complete()
})
});
rxjs.from([1, 2, 3]).pipe(syncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>
但是,对于异步操作,我有点不知所措。在下面的示例中,异步操作由对 的调用表示setTimeout()
。显然,observer.complete()
将在任何值传递给observer.next()
.
const asyncOp = () => (source) =>
new rxjs.Observable(observer => {
return source.subscribe({
next: (x) => setTimeout(() => observer.next(x), 100),
error: (e) => observer.error(err),
complete: () => observer.complete()
})
});
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>
所以问题是:什么是惯用的 RxJS 方法,以便observer.complete()
仅在所有值都异步传递给之后才进行调用observer.next()
?我应该手动跟踪待处理的呼叫还是有更“被动”的解决方案?
(请注意,上面的示例是对我的实际代码的简化,并且调用setTimeout()
旨在表示“任何异步操作”。我正在寻找一种通用方法来处理可管道运算符中的异步操作,而不是关于如何处理的建议处理 RxJS 中的延迟或超时。)