0

我正在尝试创建一个智能队列,该队列每秒或每次调用时收集数据,.next()并且在每 x 秒后(例如 x=5),它调度current队列中的所有项目,同时仍接收新项目。

整个想法是每隔x=5几秒轮询一次对服务器的 http 请求,这样就不是每秒发送这些项目,而是一次发送一批项目。

下面的片段是我尝试过的。我正在使用扫描运算符累积值,但我需要每 x=5 秒从队列中获取的组合。

import { Subject, Subscription, Observable } from 'rxjs'
import { concatMap, mergeMap, map, delay, scan } from 'rxjs/operators'
import { Video, seconds } from '../types'

interface Queue {
  addMoment: (data: { t: seconds }) => void
  unsubscribe: () => void
}

type Moment = {
  t: seconds
}

const PER_BATCH = 5
const INTERVAL = PER_BATCH * 1000

export class MomentsQueue implements Queue {
  private queue = new Subject()
  private observer: Observable<any>
  private subscriber: Subscription | undefined

  constructor(private video: Video) {
    this.observer = this.queue.pipe(
      map((payload) => {
        console.log(payload)
        return payload as Moment
      }),
      scan((all: Moment[], current) => [...all, current], []),
      concatMap(
        (payload) =>
          new Promise((resolve) => {
            console.log({ payload })
            resolve(true)
          }),
      ),
    )
  }

  private subscribe() {
    this.subscriber = this.observer.subscribe()
    // ?? this.unsubscribe() // based on any chosen event
  }

  unsubscribe() {
    this.subscriber?.unsubscribe()
  }

  addMoment(data: Moment) {
    if (!this.subscriber || this.subscriber.closed) this.subscribe()

    this.queue.next({ t: data.t })
  }
}

export default MomentsQueue


4

1 回答 1

1

我能够使用@TalOhania 的帮助解决这个问题。请参阅下面代码片段中的解决方案:

import { Subject, Subscription, Observable } from 'rxjs'
import { concatMap, map, bufferTime } from 'rxjs/operators'
import { Video, seconds } from '../types'

interface Queue {
  addMoment: (data: { t: seconds }) => void
  unsubscribe: () => void
}

type Moment = {
  t: seconds
}

const PER_BATCH = 5
const INTERVAL = PER_BATCH * 1000

export class MomentsQueue implements Queue {
  private queue = new Subject()
  private observer: Observable<any>
  private subscriber: Subscription | undefined

  constructor(private video: Video) {
    this.observer = this.queue.pipe(
      map((payload) => payload as Moment),
      bufferTime(INTERVAL),
      concatMap(
        (payload) =>
          new Promise((resolve) => {
            // send the payload to the server.
            console.log({ payload })
            resolve(true)
          }),
      ),
    )
  }

  private subscribe() {
    this.subscriber = this.observer.subscribe()
    // ?? this.unsubscribe() // based on any chosen event
  }

  unsubscribe() {
    this.subscriber?.unsubscribe()
  }

  addMoment(data: Moment) {
    if (!this.subscriber || this.subscriber.closed) this.subscribe()

    this.queue.next({ t: data.t })
  }
}

export default MomentsQueue
于 2020-07-23T18:37:04.767 回答