我有一个 observable,它将从 SignalR 集线器接收多个实时交易值(可能每秒很多)。我想要实现的是一个连续(每 10 秒)输出最后 10 秒内发生的 5 笔交易的样本的可观察值。
我编写了一个可观察的管道来尝试通过将所有接收到的交易添加到缓冲区中 10 秒来实现这一点,然后使用“concatMap”和“from”为缓冲区数组中的每个交易创建一个可观察的对象。然后,创建另一个缓冲区,收集 5 个值并发出它们。
this.bufferedTradeObservable$ = this.tradeReceived
.pipe(
tap(v => console.log('pipe-start: ', v)),
distinct((e: Trade) => e.tradeId),
bufferTime(10000),
concatMap((tradeArray) => {
return from(tradeArray);
}),
bufferCount(5),
tap(v => console.log('pipe-end: ', v))
);
但是,管道不断发出它在 10 秒窗口中接收到的所有值,但以 5 个为一组。我尝试take(5)
在 concat 映射之后在管道中添加一个,它对于第一批 5 个值可以正常工作,但是然后可观察的“完成”并停止侦听新值。我还尝试在 concatMap 之后添加一个带有索引的过滤器,如下所示:
filter((v, i) => (i < 6 )),
这适用于第一批 5 个值,但随后会不断过滤掉每个值,因此永远不会创建第二个 5 个缓冲区。此外,过滤器的这个用例似乎已被弃用。
我不确定我是否在这里忽略了一些明显的东西,但是我查看了许多 rxjs 运算符并且找不到实现此目的的方法