0

我正在尝试创建一个定期轮询后端的轮询系统。

我有一个方法add,它采用一个 pollingItem ,其中包含作为字符串的命令和间隔的 refreshTime,并将从间隔创建的 observable 传递给主题。

this.sideEffect是传递给与后端通信的 init 类构造函数的函数。

我想创建两种类型的间隔:

调用 IntervalMap 中链接到命令的主题时可以停止的默认选项。

可以以与上述相同的方式停止的“SWITCH”,但也可以在添加新的 pollingItem 时停止。

add(pollingItem, type = 'DEFAULT') {
this.intervalMap = { ...this.intervalMap, [pollingItem.command]: new Subject() };
switch (type) {
  case 'SWITCH': {
    const poller = interval(pollingItem.refreshTime).pipe(
      takeUntil(this.intervalMap[pollingItem.command]),
      takeUntil(this.onAdd$),
      tap(() => {
        this.sideEffect(pollingItem.command);
      })
    );
    this.poll$.next(poller);
    break;
  }
  default: {
    const poller = interval(pollingItem.refreshTime).pipe(
      takeUntil(this.intervalMap[pollingItem.command]),
      tap(() => {
        this.sideEffect(pollingItem.command);
      })
    );
    this.poll$.next(poller);
    break;
  }
}}

我的问题是我想要一种方法来组合poll$随着时间的推移传入的可观察值。

目前我已经尝试通过这种方式实现这个目标:

initialize() {
this.poll$
  .pipe(
    takeUntil(this.clear$),
    scan((agg, current) => {
      return [...agg, current];
    }, [])
  )
  .subscribe(
    poll => {
      combineLatest(poll)
        .pipe(takeUntil(this.onComplete$))
        .subscribe(() => {
          this.onAdd$.next();
        });
    },
    () => {},
    () => {
      this.onComplete$.next();
    }
  );}

然而,尽管这确实实现了随着时间的推移组合 observables 的目标,但它是有限的,因为它会在添加时重新启动所有间隔,这不是预期的行为。

我不知道从这里去哪里。

有什么建议么?我对 RxJS 还是新手,所以任何建议都会非常感激,无论它是否与主题相关。

谢谢!

4

0 回答 0