2

在处理各种背压场景时,我实现了一个情况,其中一个订阅者使用缓冲区很慢,而另一个订阅者消耗了扔给它的任何内容。那是使用 Scala 和 Akka Streams。如果您愿意,可以在此处查看代码以及在此处运行它的测试。

我通常会尝试开发一个 RxJava 版本进行比较,但我被困在了这个版本上。在 Akka Streams 中,我可以构建一个图,其中一个源在 2 个频道上进行广播,并从这些频道中消耗一个慢速接收器和一个快速接收器。每个通道都可以独立应用缓冲和节流。在 RxJava 中,有share用于广播的运算符,但缓冲和节流逻辑不在 . 上Subscriber,而是在Observable. 因此,我不确定如何应用缓冲和节流并且不会影响两个订阅者。Akka Streams 和 RxJava 都是 Rx 的实现,我希望有一种方法可以得到我想要的。

这是我正在尝试做的图片版本。

4

2 回答 2

1

像这样的东西?

import rx.Observable;
import rx.observables.ConnectableObservable;

import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) {

        //emits Long every 100 milliseconds, and publishes to all Subscribers simultaneously through ConnectableObservable
        ConnectableObservable<Long> source = Observable.interval(100, TimeUnit.MILLISECONDS).publish();

        //buffers emissions into Lists within 1 second durations, and first Subscriber prints them
        source.buffer(1,TimeUnit.SECONDS).subscribe(System.out::println);

        //no buffering, just push emissions directly to this second Subscriber which prints them
        source.subscribe(System.out::println);

        //start firing emissions now both Subscribers are connected
        source.connect();

        //sleep to keep program alive for 10 seconds
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Subscriber没有节流或任何运算符的概念。这些是Observable通过产生不同Observable实现的各种运算符在侧面完成的。这Subscriber是非常愚蠢的,并且只是将排放作为Observable链中的最后一步来消耗。发射发生在哪个线程上是不可知的,更不用说它是否被Observable传递给它的项目限制了。

于 2016-09-02T02:37:08.210 回答
0

如果您想要不同的背压行为,您应该能够shared()以不同的方式为不同的订阅者装饰 observable。

例如

Observable<Integer> source = Observable.interval(0, 1, TimeUnit.SECONDS).share();

// Naked source for fast consumers.
Observable<Integer> fast = source; 

// Buffer for slow consumers that use backpressure.
Observable<Integer> slow = source.onBackpressureBuffer();

订阅者fastslow以上将最终使用相同的共享源。

请注意,fast不响应背压是因为interval不响应背压。

onBackpressureXXX() 有不同的风格,您可以使用它们获得不同的行为。

于 2016-09-26T02:36:58.797 回答