我正在尝试创建一个定期轮询后端的轮询系统。
我有一个方法add
,它采用一个 pollingItem ,其中包含作为字符串的命令和间隔的 refreshTime,并将从间隔创建的 observable 传递给主题。
this.sideEffect
是传递给与后端通信的 init 类构造函数的函数。
我想创建两种类型的间隔:
调用 IntervalMap 中链接到命令的主题时可以停止的默认选项。
可以以与上述相同的方式停止的“SWITCH”,但也可以在添加新的 pollingItem 时停止。
add(pollingItem, type = 'DEFAULT') {
this.intervalMap = { ...this.intervalMap, [pollingItem.command]: new Subject() };
switch (type) {
case 'SWITCH': {
const poller = interval(pollingItem.refreshTime).pipe(
takeUntil(this.intervalMap[pollingItem.command]),
takeUntil(this.onAdd$),
tap(() => {
this.sideEffect(pollingItem.command);
})
);
this.poll$.next(poller);
break;
}
default: {
const poller = interval(pollingItem.refreshTime).pipe(
takeUntil(this.intervalMap[pollingItem.command]),
tap(() => {
this.sideEffect(pollingItem.command);
})
);
this.poll$.next(poller);
break;
}
}}
我的问题是我想要一种方法来组合poll$
随着时间的推移传入的可观察值。
目前我已经尝试通过这种方式实现这个目标:
initialize() {
this.poll$
.pipe(
takeUntil(this.clear$),
scan((agg, current) => {
return [...agg, current];
}, [])
)
.subscribe(
poll => {
combineLatest(poll)
.pipe(takeUntil(this.onComplete$))
.subscribe(() => {
this.onAdd$.next();
});
},
() => {},
() => {
this.onComplete$.next();
}
);}
然而,尽管这确实实现了随着时间的推移组合 observables 的目标,但它是有限的,因为它会在添加时重新启动所有间隔,这不是预期的行为。
我不知道从这里去哪里。
有什么建议么?我对 RxJS 还是新手,所以任何建议都会非常感激,无论它是否与主题相关。
谢谢!