8

考虑使用zip运算符将两个无限 Observable 压缩在一起,其中一个发出项目的频率是另一个的两倍。
当前的实现是无损的,即如果我让这些 Observable 发射一个小时,然后在它们的发射速率之间切换,第一个 Observable 最终将赶上另一个。
随着缓冲区越来越大,这将在某些时候导致内存爆炸。
如果第一个 observable 将发射几个小时的项目,而第二个将在最后发射一个项目,也会发生同样的情况。

我如何实现此操作员的有损行为?我只想在我从两个流中获得排放的任何时候排放,我不在乎我错过了更快的流中有多少排放。

说明:

  • 我在这里要解决的主要问题是由于zip运算符的无损特性导致的内存爆炸。
  • 即使两个流每次都发出相同的值,我也想随时从两个流中获得排放

例子:

Stream1: 1 2    3 4    5 6 7                
Stream2:     10     20       30 40 50 60 70

常规zip将产生以下输出:

[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]

const Observable = Rx.Observable;
const Subject = Rx.Subject;


const s1 = new Subject();
const s2 = new Subject();

Observable.zip(s1,s2).subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30); 
 
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

我希望它产生的输出:

[1, 10]
[3, 20]
[5, 30]

说明:
有损zip算子的zip缓冲区大小为1。这意味着它只会保留最先发出的流中的第一项,并且会丢失所有其余的项(到达第一项和第二个流的第一次发射之间的项)。因此,示例中发生的情况如下:stream1emits 1,有损 zip “记住”它并忽略所有项目,stream1直到stream2发出。第一次发射stream2就是10这么stream12。在相互发射(有损的第一个发射zip)之后,它重新开始: "remember" 3, "loose" 4, emit [3,20]。然后重新开始: "remember" 5、 "loose"67, emit [5,30]。然后重新开始:40, "松散" 50, 60,70并等待下一个项目stream1

示例 2:

Stream1: 1 2 3 ... 100000000000
Stream2:                        a

在这种情况下,普通zip操作员会爆炸内存。
我不希望它。

总结:
本质上,我希望有损zip运算符只记住stream 1 先前相互发射之后发出的第一个值,并在stream 2赶上stream 1. 并重复。

4

5 回答 5

10

以下将为您提供所需的行为:

Observable.zip(s1.take(1), s2.take(1)).repeat()

RxJs 5.5+管道语法中:

zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

解释:

  • repeat运算符(在其当前实现中)在后者完成时重新订阅可观察的源,即在这种特殊情况下,它zip在每次相互发射时重新订阅。
  • zip组合两个可观察对象并等待它们发出。combineLatest也会这样做,这并不重要,因为take(1)
  • take(1)实际上负责内存爆炸并定义有损行为

如果您想在相互发射时从每个流中获取最后一个值而不是第一个值,请使用:

Observable.combineLatest(s1, s2).take(1).repeat()

RxJs 5.5+管道语法中:

combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.combineLatest(s1,s2).take(1).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

于 2017-10-08T12:07:53.760 回答
1

我认为以下内容应始终采用每个源 Observable 的最后一个值。

const source1 = Observable.interval(1000).publish();
const source2 = Observable.interval(300).publish();

source1.connect();
source2.connect();

Observable.defer(() => Observable.forkJoin(
        source1.takeUntil(source2.skipUntil(source1)),
        source2.takeUntil(source1.skipUntil(source2))
    ))
    .take(1)
    .repeat()
    .subscribe(console.log);

现场演示: http: //jsbin.com/vawewew/11/edit ?js,console

这打印:

[ 0, 2 ]
[ 1, 5 ]
[ 2, 8 ]
[ 3, 12 ]
[ 4, 15 ]
[ 5, 18 ]

如果它们还没有,您可能需要将它们转换为热门的source1Observable 。source2

编辑:

核心部分是source1.takeUntil(source2.skipUntil(source1))。这需要从source1直到source2发出的值。但同时它会忽略source1直到source2发出至少一个值:)。

forkJoin()Observable 工作等到两个源都完成,同时记住每个源的最后一次发射。

然后我们想重复这个过程,所以我们take(1)用来完成链并.repeat()立即重新订阅。

于 2017-10-06T09:49:10.423 回答
1

这给出了序列 [ 0, 2 ] [ 1, 5 ] [ 2, 8 ] [ 3, 12 ] ...

const interval1 = Rx.Observable.interval(1000)
const interval2 = Rx.Observable.interval(300)

const combined = Rx.Observable.combineLatest(interval1, interval2);
const fresh = combined.scan((acc, x) => { 
    return x[0] === acc[0] || x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

fresh.subscribe(console.log);

可以说运营商更少。不确定它的效率如何。
代码笔

对于更新 #3

然后,您需要每个源项目的密钥。

// Simulated sources according to latest spec provided (update #3)
const source1 = Rx.Observable.from(['x','y','z'])
const source2 = Rx.Observable.from(['a','a','b','b','c'])

// Create keys for sources
let key1 = 0
let key2 = 0
const keyed1 = source1.map(x => [x, key1++])
const keyed2 = source2.map(x => [x, key2++])

const combined = Rx.Observable
  .combineLatest(keyed1, keyed2)
  .map(([keyed1, keyed2]) => [...keyed1, ...keyed2]) // to simplify scan below
combined.subscribe(console.log) // not the output, for illustration only
console.log('-------------------------------------')

const fresh = combined.scan((acc, x) => { 
    return x[1] === acc[1] || x[3] === acc[3] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

const dekeyed = fresh
  .map(keyed => { return [keyed[0], keyed[2]] })
dekeyed.subscribe(console.log); // required output

这产生

["x", "a"]  
["y", "a"]  
["z", "b"]  

CodePen(打开控制台后刷新 CodePen 页面,以获得更好的显示效果)

于 2017-10-06T12:02:03.380 回答
0

你提到缓冲区大小 1,想知道是否压缩两个缓冲区大小为 1 的 ReplaySubject 会做到吗?

于 2017-10-06T09:46:14.477 回答
0

为了清楚起见,我添加了另一个答案,因为它出现在接受的答案之后(但基于我之前的答案)。

如果我误解了,请原谅我,但我期待解决方案来处理切换排放率:

然后我在它们的发射率之间切换,

提供的测试直到第一个流停止才会切换发射率,

Stream1: 1 2    3 4    5 6 7                 
Stream2:     10     20    30 40 50 60 70

所以我尝试了另一个测试

Stream1: 1 2      3 4     5 6
Stream2:    10 20    30 40   50 60

此流的测试数据是

s1.next(1); s1.next(2); s2.next(10); s2.next(20); s1.next(3); s1.next(4);
s2.next(30); s2.next(40); s1.next(5); s1.next(6);  s2.next(50); s2.next(60);

据我了解,接受的答案未通过此测试。
它输出

[1, 10]
[3, 20]
[4, 30]
[5, 40]
[6, 50]

而我希望看到

[1, 10]
[3, 30]
[5, 50]

如果运算符是对称的(可交换的?)

加强我之前的回答

该解决方案是由基本运算符构建的,因此可以说更容易理解。我不能说它的效率,也许会在另一个迭代中测试它​​。

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

const tagged1 = s1.map(x=>[x,1])
const tagged2 = s2.map(x=>[x,2])
const merged = tagged1.merge(tagged2)
const fresh = merged.scan((acc, x) => { 
    return x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only
const dekeyed = fresh.map(keyed => keyed[0])
const paired = dekeyed.pairwise()
let index = 0
const sequenced = paired.map(x=>[x,index++])
const alternates = sequenced.filter(x => x[1] % 2 === 0)
const deindexed = alternates.map(x=>x[0])

或者如果愿意,可以采用更紧凑的形式

let index = 0
const output = 
  s1.map(x=>[x,1]).merge(s2.map(x=>[x,2])) // key by stream id
  .scan((acc, x) => { 
    return x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged()       //fresh ones only
  .map(keyed => keyed[0])       // de-key
  .pairwise()                   // pair
  .map(x=>[x,index++])          // add a sequence no
  .filter(x => x[1] % 2 === 0)  // take even sequence
  .map(x=>x[0])                 // deindex

用于测试,CodePen(打开控制台后刷新 CodePen 页面,以获得更好的显示效果)

于 2017-10-11T23:03:47.020 回答