0

我有一个 observable,它将从 SignalR 集线器接收多个实时交易值(可能每秒很多)。我想要实现的是一个连续(每 10 秒)输出最后 10 秒内发生的 5 笔交易的样本的可观察值。

我编写了一个可观察的管道来尝试通过将所有接收到的交易添加到缓冲区中 10 秒来实现这一点,然后使用“concatMap”和“from”为缓冲区数组中的每个交易创建一个可观察的对象。然后,创建另一个缓冲区,收集 5 个值并发出它们。

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        bufferTime(10000),
        concatMap((tradeArray) => {
            return from(tradeArray);
        }),
        bufferCount(5),
        tap(v => console.log('pipe-end: ', v))
      );

但是,管道不断发出它在 10 秒窗口中接收到的所有值,但以 5 个为一组。我尝试take(5)在 concat 映射之后在管道中添加一个,它对于第一批 5 个值可以正常工作,但是然后可观察的“完成”并停止侦听新值。我还尝试在 concatMap 之后添加一个带有索引的过滤器,如下所示:

filter((v, i) => (i < 6 )),

这适用于第一批 5 个值,但随后会不断过滤掉每个值,因此永远不会创建第二个 5 个缓冲区。此外,过滤器的这个用例似乎已被弃用。

我不确定我是否在这里忽略了一些明显的东西,但是我查看了许多 rxjs 运算符并且找不到实现此目的的方法

4

3 回答 3

2

bufferTime有一个maxBufferSize论据可以为你做到这一点。

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        bufferTime(10000, 10000, 5),
        concatMap((tradeArray) => {
            return from(tradeArray);
        }),
        tap(v => console.log('pipe-end: ', v))
      );

您也可以windowTime改为在创建每个值后立即输出它,而不是等待所有 5 个值。

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        windowTime(10000, 10000, 5),
        mergeAll()
        tap(v => console.log('pipe-end: ', v))
      );

这些分别在 和 的文档中进行了bufferTime介绍windowTime

于 2022-02-23T22:00:48.150 回答
2

听起来你只需要bufferTime。您可以决定保留什么,然后丢弃什么。

this.bufferedTradeObservable$ = this.tradeReceived.pipe(
  // Buffer for 1 seconds
  bufferTime(10000),
  // Only emit the last 5 values from the buffer.
  map(buffer => buffer.slice(-5))
);
于 2022-02-24T13:42:34.103 回答
2

这样的事情怎么办

let n = 5;
let t = 10;

//Source, emits a value every second (just a placeholder for real source)

const source = interval(1000);

//Take n=5 values from the source, then end the stream
const takeNValues = source.pipe(take(n));

//Every t=10 seconds switch to a new observable that emits n=5 values and then closes

const takeNValuesEveryTSeconds = interval(t * 1000).pipe(
  switchMap(() => takeNValues)
);

//Subscribe and log n=5 values every t=10 seconds

takeNValuesEveryTSeconds.subscribe(n => 
  console.log('Value => ', n)
);
于 2022-02-23T21:50:04.803 回答