在处理各种背压场景时,我实现了一个情况,其中一个订阅者使用缓冲区很慢,而另一个订阅者消耗了扔给它的任何内容。那是使用 Scala 和 Akka Streams。如果您愿意,可以在此处查看代码以及在此处运行它的测试。
我通常会尝试开发一个 RxJava 版本进行比较,但我被困在了这个版本上。在 Akka Streams 中,我可以构建一个图,其中一个源在 2 个频道上进行广播,并从这些频道中消耗一个慢速接收器和一个快速接收器。每个通道都可以独立应用缓冲和节流。在 RxJava 中,有share
用于广播的运算符,但缓冲和节流逻辑不在 . 上Subscriber
,而是在Observable
. 因此,我不确定如何应用缓冲和节流并且不会影响两个订阅者。Akka Streams 和 RxJava 都是 Rx 的实现,我希望有一种方法可以得到我想要的。
这是我正在尝试做的图片版本。