3

类似于数组,flatten([1, 2 [3, 4], [5, 6]]) === [1, 2, 3, 4, 5, 6].

我想在 rxjs observables 中这样做:

const test$ = Rx.Observable.from(
    [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7])]
).mergeAll()

test$.subscribe(x => console.log(x)) //I want to output 1, 2, 3, 4, 5, 6, 7

mergeAll 不起作用并引发错误。

这是非常肮脏的解决方案:

const inElegant$ = Rx.Observable.merge(
  test$.filter(x => x instanceof Rx.Observable).mergeAll(),
  test$.filter(x => !(x instanceof Rx.Observable))
)

inElegant$.subscribe(x => console.log(x));

有没有更好的解决方案?

jsbin http://jsbin.com/vohizoqiza/1/edit?js,console

4

3 回答 3

4

如果我们在表单上有一个流

const stream = Rx.Observable.from(
    [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]), 8])

有几种方法可以将其转换为纯数字流(不包括解决方案中的过滤。)

以下是三种可能的解决方案:

// prints 1, 2, 3, 4, 5, 6, 7
stream
    .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .concatAll()
    .subscribe(x => console.log(x));

// prints 1, 2, 3, 4, 5, 6, 7
stream
    .selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .subscribe(x => console.log(x));

// prints 1, 2, 3, 4, 5, 6, 7
stream
    .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .mergeAll()
    .subscribe(x => console.log(x));

这一切看起来都不错。然而,有几件事需要考虑。如果我们更改源流以使其异步:

const asyncStream = Rx.Observable.interval(1000)
    .select((val, idx) => idx + 8).take(5);

const stream = Rx.Observable.from(
    [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]),
    asyncStream, 13, 14, 15])

我们使用与之前相同的解决方案得到以下结果:

// prints 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
stream
    .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .concatAll()
    .subscribe(x => console.log(x));

// prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
stream
    .selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .subscribe(x => console.log(x));

// prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
stream
    .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .mergeAll()
    .subscribe(x => console.log(x));

所以总结一下。using selectManyor selectfollow bymergeAll解决了生成正确类型的扁平化列表的问题,但不维护顺序。这些解决方案将侦听所有流并在任何流产生值时产生结果。

concatAll解决方案的行为略有不同。此解决方案将按顺序侦听每个流,仅在最后一个流完成时才切换到下一个值/流。

所以这些是一些解决方案,你想要哪一个取决于你的需求。然而,所有这些都摆脱了过滤流的需要。

于 2016-05-31T08:45:46.483 回答
1

对于这些情况,我个人使用toObservable转换函数。该函数保持 observable 不变,并将其他类型包装在 observable 中(with Rx.Observable.return)。所以它是这样使用的:

const test$ = Rx.Observable.from(
    [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7])]
).map(toObservable).mergeAll()

这与您正在做的事情很接近,但我发现将其打包在一个可以在其他上下文中重用的单独函数中很方便。

于 2016-05-29T14:24:34.403 回答
0

根据文档 mergeAll

将可观察序列的可观察序列合并为可观察序列。

你有一个混合的集合(数字和可观察的)所以不起作用。

做到这一点的唯一方法是你如何做,用不同的方法处理这两种类型:尽管你应该问自己,你是如何在同一序列中同时拥有 Observable 和原始类型的。

于 2016-05-29T07:57:17.857 回答