7

我是响应式编程领域的新手,我正在尝试使用 rxjava 2 创建一个简单的背压感知消息处理。

以下是我试图实现的工作流程:

  1. Flowable 的连续字符串流。

  2. 执行耗时操作并将消息更改为另一个字符串

  3. 执行另一个耗时的操作。

现在我正在使用以下代码:

{
    Flowable.create(subscriber -> {
             some_stream.forEach(data -> {
                subscriber.onNext(data);
            });
        }, BackpressureStrategy.BUFFER).
    subscribeOn(Schedulers.io()). // Data emission will run io scheduler
    observeOn(Schedulers.computation()). // Map operation will run on computation scheduler
    map(val -> Time_Consuming_Task(val)). // Task returns another string
    observeOn(Schedulers.io()).  / Next consumer will run on computation scheduler
    subscribe(val -> Another_Time_Consuming_Task(val));
}

现在对于小型操作,我没有看到任何与背压相关的问题。

但是对于大型流,我不知道它会如何表现。

现在我的问题是:-

  1. 在BackpressureStrategy.BUFFER的情况下,默认缓冲区大小是多少?数据在哪里被缓冲?

  2. 如果我想在每个耗时的任务之前创建两个背压缓冲区,我应该使用onBackpressureBuffer 运算符吗?

  3. 如果缓冲区已满,我不想丢失数据,我想等待或在那种情况下?

4

1 回答 1

5

要回答您的问题:

1. 默认缓冲区大小因平台而异。在 JVM 上,每个环形缓冲区有 128 个项目,在 Android 上是 16 个项目(来源

这比之前的 1024 有所降低(您可以在此处查看RxJava 中正在实现的更改)。如果需要,您还可以使用系统属性自行调整:

System.setProperty("rx.ring-buffer.size", "8");

由于它们被称为环形缓冲区,它们存储在内存中。你可以在这里阅读更多关于它们的信息。

2. & 3. 如果它满了,它就会开始覆盖自己。在这种情况下,使用 onBackpressureBuffer

循环缓冲区的结果是,当它已满并执行后续写入时,它开始覆盖最旧的数据。

引用关于Circular buffer的 wiki 文章。

当您知道您rx.ring-buffer.size的 .

onBackpressureBuffer(int capacity, // This is the given bound, not a setter for the ring buffer
    Action0 onOverflow, // The desired action to execute
    BackpressureOverflow.Strategy strategy) // The desired strategy to use

再说一次,我不能说得更好,让我引用 RxJava wiki on Backpressure (2.0)

BackpressureOverflow.Strategy 实际上是一个接口,但是 BackpressureOverflow 类提供了 4 个静态字段,它的实现代表了典型的动作:

  • ON_OVERFLOW_ERROR:这是前两个重载的默认行为,发出 BufferOverflowException 信号
  • ON_OVERFLOW_DEFAULT: 目前与 ON_OVERFLOW_ERROR 相同
  • ON_OVERFLOW_DROP_LATEST:如果发生溢出,当前值将被简单地忽略,并且一旦下游请求将只传递旧值。
  • ON_OVERFLOW_DROP_OLDEST:删除缓冲区中最旧的元素并将当前值添加到其中。

请注意,最后两种策略会导致流中的不连续性,因为它们会丢弃元素。此外,它们不会发出 BufferOverflowException 信号。

这是一个例子:

Flowable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

值得注意的是:

RxJava 2.x 中的Observable类型没有背压的概念。实现Observable实际上与默认使用相同onBackpressureBuffer()。UI 事件、一次性网络请求和状态更改都应该使用这种方法。、Completable和类型也可以规定这种行为MaybeSingle

如果你需要支持背压,RxJava 2.x 的新类 ,Flowable就像Observable在 RxJava 1.x 中一样支持背压。但是,更新后的库现在需要明确选择背压策略以防止意外MissingBackpressureExceptions

阅读更多:

于 2017-11-24T17:53:31.743 回答