1

理论上应该just()可以flatMap()通过flatMap(). 例如map()可以实现为

function map(source, selector) {
  return source.flatMap(x => Rx.Observable.just(selector(x)));
}

如何merge()通过flatMap()?(当然也避免mergeAll()

4

1 回答 1

4

如果您利用 flatMap 也可以获取数组返回值这一事实,它看起来是可能的。

Rx.Observable.prototype.merge = function(other) {
  var source = this;
  return Rx.Observable.just([source, other])
           //Flattens the array into observable of observables
           .flatMap(function(arr) { return arr; })
           //Flatten out the observables
           .flatMap(function(x) { return x; });
}

编辑 1

使用 RxJS 6 和pipe语法

import {of} from 'rxjs'
import {flatMap} from 'rxjs/operators'

function merge (other) {
  return source => of([source, other]).pipe(
           //Flattens the array into observable of observables
           flatMap(arr => arr)
           //Flatten out the observables
           flatMap(x => x)
         );
}

const {timestamp, map, flatMap, take} = rxjs.operators;
const {interval, of: just} = rxjs;

const source1 = interval(2000).pipe(
  timestamp(),
  map(x => "Interval 1 at " + x.timestamp + " w/ " + x.value)
)

const source2 = interval(3000).pipe(
  timestamp(),
  map(x => "Interval 2 at " + x.timestamp + " w/ " + x.value)
)

function mergeFromFlatMap (other) {
  return source => just([source, other]).pipe(
    flatMap(arr => arr),
    flatMap(seq => seq)
  )
}

source1.pipe(
  mergeFromFlatMap(source2),
  take(20)
).subscribe(console.log.bind(console));
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>

于 2015-06-19T16:13:24.670 回答