1

我实际上使用的是 Scala,但这个问题对所有 Rx 和流框架都是通用的。

我的用例是我有一个生成的 observable(因此很冷),我希望多个消费者并行使用完全相同的值,但我希望它们具有显着不同的吞吐量。

我需要通过重放广播一个可观察的来完成,但我看到使用最大缓冲区大小重放的常见策略是在溢出时从缓冲区中删除元素(然后对于最慢的消费者丢失)而不是返回 -给生产者施压。如果您将所有广播的 observables 都视为热的,这是有道理的,但是,就我而言,我知道它实际上是冷的并且可以被背压。

在任何 JVM 反应流兼容框架中是否有某种方法可以实现这一点?

非常感谢!

4

1 回答 1

1

RxJava 通过publish协调来自单个消费者的请求的操作符来支持这一点,也就是说,它以与最慢的消费者请求一样快的固定速率请求。不幸的是,目前没有 RxScala 2,只有 RxJava 2 支持 Reactive-Streams 规范,因此,将其转换为 Scala 可能会有些不便:

Flowable.fromPublisher(Flowable.range(1, 1000))
.publish(f -> 
    Flowable.mergeArray(
        f.observeOn(Schedulers.computation()).map(v -> v * v),
        f.observeOn(Schedulers.computation()).map(v -> v * v * v)
    )
 )
 .blockingSubscribe(System.out::println);

ConnectableObservable另一种方法是在所有消费者都订阅后使用 a并手动连接:

ConnectableFlowable<Integer> co = Flowable.fromPublisher(Flowable.range(1, 1000))
    .publish();

co.observeOn(Schedulers.computation()).map(v -> v * v)
  .subscribe(System.out::println);

co.connect();
于 2017-03-05T17:44:35.597 回答