3

我有一个 Observable,其中每个新值都应该引发一个 HTTP 请求。在客户端,我只关心最新的响应值;但是,我希望完成每个请求以进行监控/等。目的。

我目前拥有的是这样的:

function simulate(x) {
  // Simulate an HTTP request.
  return of(x).pipe(delay(6));
}

source$.pipe(
  someMapFunc(x => simulate(x)),
);

当我使用switchMapfor 时someMapFunc,我得到了正确的响应集(只有最新的)。但是,如果请求花费的时间过长,它将被取消。

当我mergeMap改为使用时,我得到了正确的请求集(每个请求都完成),但我得到了错误的响应集(每一个)。

上面代码的大理石图

mergeMap有没有办法通过响应来获取请求switchMap我知道我可以将其编写为自定义运算符,但我想知道是否可以使用现有/标准 rxjs 运算符构建它。总结一下我的想法:

  • switchMap切换时不会取消订阅的版本;
  • 它的一个版本mergeMap只从最新的内部 Observable 发出值。

编辑:根据接受的答案,我能够得到以下内容,这很有效:

function orderedMergeMap(project) {
  return (s) => defer(() => {
    let recent = 0;
    return s.pipe(
      mergeMap((data, idx) => {
        recent = idx;
        return project(data).pipe(filter(() => idx === recent));
      })
    );
  });
}
4

2 回答 2

4

我不能 100% 确定这是否是您所追求的,而且我还没有完全测试过这一点,但我创建了一个自定义运算符,它可能会做一些接近您所追求的事情。也许你可以多修修补补。

这是一个过滤掉“旧”值的合并映射。旧值是新源开始排放后发生的源排放。

function orderedMergeMap<T, R>(project: (v:T) => Observable<R>): OperatorFunction<T, R> {
  return s => defer(() => {
    let recent = 0;
    return s.pipe(
      map((v, i) => ({order: i, payload: v})),
      mergeMap(({order, payload}) => project(payload).pipe(
        map(v => ({order, payload: v}))
      )),
      tap(({order}) => {
        if(order > recent) recent = order;
      }),
      filter(({order}) => order < recent),
      map(({payload}) => payload)
    );
  });
}

版本 OP 确定:

function orderedMergeMap<T, R>(project: (v:T) => Observable<R>): OperatorFunction<T, R> {
  return s => defer(() => { 
    let recent = 0; 
    return s.pipe( 
      mergeMap((data, idx) => { 
        recent = idx; 
        return project(data).pipe(
          filter(() => idx === recent)
        ); 
      }) 
    ); 
  }); 
}
于 2021-11-23T05:13:14.997 回答
2

我相信您需要concatMap()和的组合last()

concatMap在前一个完成之前不会订阅下一个 observable。使用它,您将确保请求执行的顺序。从描述中可以看出,它不会取消以前的订阅并让它们完成,不像switchMap.

last在完成时发出从源发出的最后一个值。使用它,您将确保只有一个(最后一个)结果将传递给结果。

您的代码将如下所示:

source$.pipe(
  concatMap(x => simulate(x)),
  last()
);
于 2021-11-23T01:01:01.360 回答