如果我们在表单上有一个流
const stream = Rx.Observable.from(
[1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]), 8])
有几种方法可以将其转换为纯数字流(不包括解决方案中的过滤。)
以下是三种可能的解决方案:
// prints 1, 2, 3, 4, 5, 6, 7
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.concatAll()
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7
stream
.selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.mergeAll()
.subscribe(x => console.log(x));
这一切看起来都不错。然而,有几件事需要考虑。如果我们更改源流以使其异步:
const asyncStream = Rx.Observable.interval(1000)
.select((val, idx) => idx + 8).take(5);
const stream = Rx.Observable.from(
[1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]),
asyncStream, 13, 14, 15])
我们使用与之前相同的解决方案得到以下结果:
// prints 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.concatAll()
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
stream
.selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.mergeAll()
.subscribe(x => console.log(x));
所以总结一下。using selectMany
or select
follow bymergeAll
解决了生成正确类型的扁平化列表的问题,但不维护顺序。这些解决方案将侦听所有流并在任何流产生值时产生结果。
该concatAll
解决方案的行为略有不同。此解决方案将按顺序侦听每个流,仅在最后一个流完成时才切换到下一个值/流。
所以这些是一些解决方案,你想要哪一个取决于你的需求。然而,所有这些都摆脱了过滤流的需要。