RxJava 通过publish
协调来自单个消费者的请求的操作符来支持这一点,也就是说,它以与最慢的消费者请求一样快的固定速率请求。不幸的是,目前没有 RxScala 2,只有 RxJava 2 支持 Reactive-Streams 规范,因此,将其转换为 Scala 可能会有些不便:
Flowable.fromPublisher(Flowable.range(1, 1000))
.publish(f ->
Flowable.mergeArray(
f.observeOn(Schedulers.computation()).map(v -> v * v),
f.observeOn(Schedulers.computation()).map(v -> v * v * v)
)
)
.blockingSubscribe(System.out::println);
ConnectableObservable
另一种方法是在所有消费者都订阅后使用 a并手动连接:
ConnectableFlowable<Integer> co = Flowable.fromPublisher(Flowable.range(1, 1000))
.publish();
co.observeOn(Schedulers.computation()).map(v -> v * v)
.subscribe(System.out::println);
co.connect();