考虑使用zip运算符将两个无限 Observable 压缩在一起,其中一个发出项目的频率是另一个的两倍。
当前的实现是无损的,即如果我让这些 Observable 发射一个小时,然后在它们的发射速率之间切换,第一个 Observable 最终将赶上另一个。
随着缓冲区越来越大,这将在某些时候导致内存爆炸。
如果第一个 observable 将发射几个小时的项目,而第二个将在最后发射一个项目,也会发生同样的情况。
我如何实现此操作员的有损行为?我只想在我从两个流中获得排放的任何时候排放,我不在乎我错过了更快的流中有多少排放。
说明:
- 我在这里要解决的主要问题是由于
zip
运算符的无损特性导致的内存爆炸。 - 即使两个流每次都发出相同的值,我也想随时从两个流中获得排放
例子:
Stream1: 1 2 3 4 5 6 7
Stream2: 10 20 30 40 50 60 70
常规zip
将产生以下输出:
[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]
const Observable = Rx.Observable;
const Subject = Rx.Subject;
const s1 = new Subject();
const s2 = new Subject();
Observable.zip(s1,s2).subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
我希望它产生的输出:
[1, 10]
[3, 20]
[5, 30]
说明:
有损zip
算子的zip
缓冲区大小为1
。这意味着它只会保留最先发出的流中的第一项,并且会丢失所有其余的项(到达第一项和第二个流的第一次发射之间的项)。因此,示例中发生的情况如下:stream1
emits 1
,有损 zip “记住”它并忽略所有项目,stream1
直到stream2
发出。第一次发射stream2
就是10
这么stream1
松2
。在相互发射(有损的第一个发射zip
)之后,它重新开始: "remember" 3
, "loose" 4
, emit [3,20]
。然后重新开始: "remember" 5
、 "loose"6
和7
, emit [5,30]
。然后重新开始:40
, "松散" 50
, 60
,70
并等待下一个项目stream1
。
示例 2:
Stream1: 1 2 3 ... 100000000000
Stream2: a
在这种情况下,普通zip
操作员会爆炸内存。
我不希望它。
总结:
本质上,我希望有损zip
运算符只记住stream 1
先前相互发射之后发出的第一个值,并在stream 2
赶上stream 1
. 并重复。