我有一个 Observable,其中每个新值都应该引发一个 HTTP 请求。在客户端,我只关心最新的响应值;但是,我希望完成每个请求以进行监控/等。目的。
我目前拥有的是这样的:
function simulate(x) {
// Simulate an HTTP request.
return of(x).pipe(delay(6));
}
source$.pipe(
someMapFunc(x => simulate(x)),
);
当我使用switchMap
for 时someMapFunc
,我得到了正确的响应集(只有最新的)。但是,如果请求花费的时间过长,它将被取消。
当我mergeMap
改为使用时,我得到了正确的请求集(每个请求都完成),但我得到了错误的响应集(每一个)。
mergeMap
有没有办法通过响应来获取请求switchMap
?我知道我可以将其编写为自定义运算符,但我想知道是否可以使用现有/标准 rxjs 运算符构建它。总结一下我的想法:
switchMap
切换时不会取消订阅的版本;- 它的一个版本
mergeMap
只从最新的内部 Observable 发出值。
编辑:根据接受的答案,我能够得到以下内容,这很有效:
function orderedMergeMap(project) {
return (s) => defer(() => {
let recent = 0;
return s.pipe(
mergeMap((data, idx) => {
recent = idx;
return project(data).pipe(filter(() => idx === recent));
})
);
});
}