内部合并的 observable 不会以此代码终止(rxjs 5.5.6):
let source = new Subject<string[]>();
// when the source emits a vector of strings, output
// each string with a 1s delay
source.switchMap(v => Observable.from(v)
.map(s => Observable.of(s).delay(1000).do(s => console.log('do: ' + s)))
// only one active observable at time
.mergeAll(1)
).subscribe(val => console.log('result: ' + val));
// emit two vectors, 1.5s apart
Observable.interval(1500).take(2).map(i => ['a' + i, 'b' + i, 'c' + i])
.subscribe(v => source.next(v));
输出是:
do: a0
result: a0
do: b0
do: a1
result: a1
do: c0
do: b1
result: b1
do: c1
result: c1
预期的输出是:
do: a0
result: a0
do: a1
result: a1
do: b1
result: b1
do: c1
result: c1
也就是说,在第二个向量发出后,switchMap 应该取消订阅第一个向量上的可观察对象,取消该可观察对象。虽然 unsubscribe 显然在工作,但内部 observable 仍在运行,作为第一个示例输出中的“do: a0 .. b0 .. c0”的证据。
事实上,预期的输出正是您从这段代码中得到的:
let source =
Observable.interval(1500).take(2).map(i => ['a' + i, 'b' + i, 'c' + i]);
source.switchMap(v => Observable.from(v)
.map(s => Observable.of(s).delay(1000).do(s => console.log('do: ' + s)))
.mergeAll(1)
).subscribe(val => console.log('result: ' + val));
但是为什么第一个示例的行为方式不同呢?