0

我是 RxJS 世界的新手,对此我有点迷茫。希望有人可以帮助我。

我有一个可观察的源(通过 AngularFire 的 Firebase),它在随机派克时间内不断向我发出大量数据(在 2 秒的窗口中最多发出 50 或 80 次),因为这会减慢我的项目性能,我认为是正确的处理这个问题的方法是将排放分组到一个数组中,然后对接收到的所有数据进行事务处理并将其插入到存储中。

我正在寻找的结果如下所示:

考虑到我会放置 3 秒的“保持”时间,我想要以下结果:

           1s     1.5s
--> 30 --> 60 --> 100

          1s           2s
--> 5 --> 1 --> 50 --> 70
  1. [30, 60, 100] --> 以 1.5s 的间隔时间
  2. [5, 1, 50, 70] --> 以 2s 间隔时间

数组中的值将是从收到的第一个发射开始的特定时间收到的发射。在特定时间之后,它将在下一批排放中“重新启动”初始化(实际上可能在 1 秒或 2 小时内,但随后,间隔将触发 2 秒的排放)

到目前为止我尝试过的是使用 Window 和 Buffer,也许我没有正确使用这些,或者我只是愚蠢但我找不到我刚刚解释的结果。

filter((snapshot) => { if (snapshot.payload.val().reference) { return snapshot; } }),
window(interval(2000)),
mergeAll(),
withTransaction((snapshots:[]) => {
   snapshots.forEach(snapshot => {
     if (snapshot.type === 'child_changed') {

       this.store.add(snapshot.key, snapshot.val());

     } else if (snapshot.type === 'child_changed') {

       this.store.replace(snapshot.key, snapshot.val());

     } else if (snapshot.type === 'child_removed') {

       this.store.remove(snapshot.key);

     }
   })
})

我什至不知道 RxJS 是否有可能(我想是的。我已经看到了很多很酷的东西),但是任何建议或指导如何通过它,将不胜感激。

提前非常感谢!

注意: withTransaction 是一个自定义操作符。

4

1 回答 1

1

不肯定你在追求什么,但似乎你想要bufferTime

const source = timer(0, 500);
const buffered = source.pipe(bufferTime(3000));
buffered.subscribe(val => console.log(val));

这将每 3 秒将在缓冲期内收集的所有值作为数组发出。

闪电战演示:https ://stackblitz.com/edit/rxjs-vpu97e?file=index.ts

在您的示例中,我认为您只需将其用作:

filter((snapshot) => snapshot.payload.val().reference), // this is all you need for filter
bufferTime(2000),
withTransaction((snapshots:[]) => {
   ...
})
于 2020-03-26T18:25:24.630 回答