我正在尝试创建一个智能队列,该队列每秒或每次调用时收集数据,.next()
并且在每 x 秒后(例如 x=5),它调度current
队列中的所有项目,同时仍接收新项目。
整个想法是每隔x=5
几秒轮询一次对服务器的 http 请求,这样就不是每秒发送这些项目,而是一次发送一批项目。
下面的片段是我尝试过的。我正在使用扫描运算符累积值,但我需要每 x=5 秒从队列中获取的组合。
import { Subject, Subscription, Observable } from 'rxjs'
import { concatMap, mergeMap, map, delay, scan } from 'rxjs/operators'
import { Video, seconds } from '../types'
interface Queue {
addMoment: (data: { t: seconds }) => void
unsubscribe: () => void
}
type Moment = {
t: seconds
}
const PER_BATCH = 5
const INTERVAL = PER_BATCH * 1000
export class MomentsQueue implements Queue {
private queue = new Subject()
private observer: Observable<any>
private subscriber: Subscription | undefined
constructor(private video: Video) {
this.observer = this.queue.pipe(
map((payload) => {
console.log(payload)
return payload as Moment
}),
scan((all: Moment[], current) => [...all, current], []),
concatMap(
(payload) =>
new Promise((resolve) => {
console.log({ payload })
resolve(true)
}),
),
)
}
private subscribe() {
this.subscriber = this.observer.subscribe()
// ?? this.unsubscribe() // based on any chosen event
}
unsubscribe() {
this.subscriber?.unsubscribe()
}
addMoment(data: Moment) {
if (!this.subscriber || this.subscriber.closed) this.subscribe()
this.queue.next({ t: data.t })
}
}
export default MomentsQueue