我是 RxJS 世界的新手,对此我有点迷茫。希望有人可以帮助我。
我有一个可观察的源(通过 AngularFire 的 Firebase),它在随机派克时间内不断向我发出大量数据(在 2 秒的窗口中最多发出 50 或 80 次),因为这会减慢我的项目性能,我认为是正确的处理这个问题的方法是将排放分组到一个数组中,然后对接收到的所有数据进行事务处理并将其插入到存储中。
我正在寻找的结果如下所示:
考虑到我会放置 3 秒的“保持”时间,我想要以下结果:
1s 1.5s
--> 30 --> 60 --> 100
1s 2s
--> 5 --> 1 --> 50 --> 70
- [30, 60, 100] --> 以 1.5s 的间隔时间
- [5, 1, 50, 70] --> 以 2s 间隔时间
数组中的值将是从收到的第一个发射开始的特定时间收到的发射。在特定时间之后,它将在下一批排放中“重新启动”初始化(实际上可能在 1 秒或 2 小时内,但随后,间隔将触发 2 秒的排放)
到目前为止我尝试过的是使用 Window 和 Buffer,也许我没有正确使用这些,或者我只是愚蠢但我找不到我刚刚解释的结果。
filter((snapshot) => { if (snapshot.payload.val().reference) { return snapshot; } }),
window(interval(2000)),
mergeAll(),
withTransaction((snapshots:[]) => {
snapshots.forEach(snapshot => {
if (snapshot.type === 'child_changed') {
this.store.add(snapshot.key, snapshot.val());
} else if (snapshot.type === 'child_changed') {
this.store.replace(snapshot.key, snapshot.val());
} else if (snapshot.type === 'child_removed') {
this.store.remove(snapshot.key);
}
})
})
我什至不知道 RxJS 是否有可能(我想是的。我已经看到了很多很酷的东西),但是任何建议或指导如何通过它,将不胜感激。
提前非常感谢!
注意: withTransaction 是一个自定义操作符。