1

比方说,我有一个动作流。每个动作都分配了一些 id。像这样:

const actions$ = of({ id: 1 }, { id: 2 }, { id: 1 });

现在,对于每个动作,我想在 switchMap 中执行一些逻辑:

actions$.pipe(switchMap(a => /* some cancellable logic */)).subscribe(...);

问题是每个发出的动作都会取消以前的“一些可取消的逻辑”。

是否可以根据操作id取消“一些可取消的逻辑” ,最好是操作员?就像是:

actions$.pipe(switchMapBy('id', a => /*some cancellable logic */)).subscribe(...)

本质上,switchMap的当前行为
1. actions$发出 id #1。switchMap订阅嵌套的 observable。
2. actions$发出 id #2。switchMap取消订阅之前的嵌套 observable。订阅新的。
3. actions$发出 id #1。switchMap再次取消订阅之前的嵌套 observable。订阅新的。

预期行为
1. actions$发出 id #1。switchMap订阅嵌套的 observable。
2. actions$发出 id #2。switchMap再次订阅嵌套的 observable(这次是 #2)。这就是区别,它不会取消来自 #1 的那个
3. actions$发出 id #1。switchMap取消订阅 #1 的嵌套 observable。再次订阅,#1。

4

2 回答 2

4

这似乎是 mergeMap 运算符的用例。switchMap 的用例是只维护一个内部订阅并取消以前的订阅,这不是您要的。您需要多个内部订阅,并且您希望它们在相同 id 的新值出现时取消,因此实现一些自定义逻辑来做到这一点。

类似于:

action$.pipe(
  mergeMap(val => {
    return (/* your transform logic here */)
              .pipe(takeUntil(action$.pipe(filter(a => a.id === val.id)))); // cancel it when the same id comes back through, put this operator at the correct point in the chain
  })
)

您可以通过编写自定义运算符将其变成可重复使用的东西:

import { OperatorFunction, Observable, from } from 'rxjs';
import { takeUntil, filter, mergeMap } from 'rxjs/operators';

export function switchMapBy<T, R>(
  key: keyof T,
  mapFn: (val: T) => Observable<R> | Promise<R>
): OperatorFunction<T, R> {
  return input$ => input$.pipe(
    mergeMap(val => 
      from(mapFn(val)).pipe(
        takeUntil(input$.pipe(filter(i => i[key] === val[key])))
      )
    )
  );
}

并像这样使用它:

action$.pipe(
  switchMapBy('id', (val) => /* your transform logic here */)
);

这是它的闪电战:https ://stackblitz.com/edit/rxjs-x1g4vc?file=index.ts

于 2019-07-06T21:01:04.360 回答
0

在 switchMap 之前使用过滤器操作来排除取消的 id,像这样

of({ id: 1 }, { id: 2 }, { id: 1 }).pipe(
   filter(id => ![1,2].includes(id)), // to exclude ids (1,2)
   switchMap(id => /*some cancellable logic */ )
).subscribe(...)
于 2019-07-06T20:26:27.230 回答