0

我开发了一个 Angular 9 应用程序,但我不明白如何将shareReplay运算符与其他运算符一起使用。我做了类似以下的事情:

if (!this.cache[key]) {
  this.cache[key] = this.http.get(...).pipe(
    shareReplay(1),
    flatMap(...),
    map(...),
    reduce(...)
  );
}
return this.cache[key];

在此之后,我的应用程序卡住了 100% 的 CPU 使用率。当我将其更改为:

if (!this.cache[key]) {
  this.cache[key] = this.http.get(...).pipe(
    flatMap(...),
    map(...),
    reduce(...),
    shareReplay(1)
  );
}
return this.cache[key];

它似乎工作正常。是否有必要使用shareReplay运算符作为最后一个?这么高的 CPU 使用率从何而来?

编辑:更详细的代码片段:

this.http.get(...).pipe(
  // I would like to avoid several same http calls
  shareReplay(1),
  // I would like to flatten an array of objects that comes from backend
  flatMap(option => option),
  // I need to map all objects to a format that is acceptable by some library
  map(option => ({
    value: option.key,
    label: option.value
  })),
  // I must reduce it back to an array of the new objects
  reduce((acc: { value: string; label: string }[], option) => {
    acc.push(option);
    return acc;
  }, [])
);
4

1 回答 1

2

运营商在管道中的位置share很重要,因为它决定了有多少订阅者将订阅之前的运营商share

举个例子:

const interval$ = interval(1000).pipe(
  tap(() => console.log("Interval Triggered")
);

interval$.subscribe();
interval$.subscribe();

您将看到我们每秒收到两条控制台消息。这是有道理的,因为在 RxJS 中,每个订阅者(“消费者”)都会导致工厂创建一个新的“生产者”(这里是 a setInterval)。在这种情况下 - 没有share运营商 - 每个订阅者都将沿着运营商链向上并订阅前一个运营商 - 直到我们订阅interval两次。

现在,让我们来看看share

const interval$ = interval(1000).pipe(
  tap(() => console.log("Interval Triggered"),
  share()
);

interval$.subscribe();
interval$.subscribe();

在这个版本中,我们每秒只会收到一条消息。为什么?

因为share只要有一个活跃的订阅者,就不会订阅它的来源。所以,我们订阅了share()两次,但share()只订阅了interval一次。

如果我们切换位置,我们会得到不同的结果:

const interval$ = interval(1000).pipe(
  share(),
  tap(() => console.log("Interval Triggered"),
);

interval$.subscribe();
interval$.subscribe();

在这种情况下,我们订阅了tap两次——这两个tap运营商订阅了share两次。这就是我们每秒收到两条控制台消息的原因。

根据经验,您很可能希望您share在管道的末端,因为这将导致之前的操作员share仅被订阅一次。


解释你的 CPU 消耗:我怀疑你经常订阅这个 Observable——share不是最后一个操作员,这意味着你会一遍又一遍地重新订阅flatMap等等。

于 2020-08-03T16:34:44.827 回答