5

我正在尝试熟悉反应式背压处理的问题,特别是通过阅读此 wiki:https ://github.com/ReactiveX/RxJava/wiki/Backpressure

在缓冲区段落中,我们有这个更复杂的示例代码:

// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);

如果我理解正确,我们通过为缓冲区操作符生成去抖信号流来有效地去抖突发源流。

但是为什么我们需要在这里使用 publish 和 refcount 操作符呢?如果我们只是丢弃它们会导致什么问题?评论并没有让我更清楚,RxJava Observables 不是默认支持多播吗?

4

1 回答 1

3

答案在于冷热观测值之间的差异。

缓冲区运算符结合了 2 个流,无法知道它们有一个共同的来源(在您的情况下)。激活(订阅)后,它将同时订阅它们,这反过来会触发对原始输入的 2 个不同订阅。

现在可能发生 2 件事,要么输入是热的 observable,订阅除了注册监听器外没有任何作用,一切都会按预期工作,或者是冷的 observable,每个订阅都会导致潜在的不同和不同步的流.

例如,冷可观察对象可以是在订阅时执行网络请求并通知结果的对象。不调用发布意味着将完成 2 个请求。

Publish+refcount/connect 是将冷的 observable 转换为热的 observable 的常用方法,确保会发生单个订阅,并且所有流的行为都相同。

于 2015-10-04T16:20:45.087 回答