6

我有一个rx.Observable将任务进度发送到onNext(). onNext()排放有时会发生得如此之快,以至于无法Observer跟上,从而导致背压。我想通过仅缓冲来自Observable.

例如:

  • Observable发出1Observer接收1
  • 虽然Observer仍在处理1,但Observable发出234
  • Observer完成处理1并开始处理4(排放23被丢弃)。

这似乎是在 Rx Observable 中处理进度的常见情况,因为您通常只关心使用最新进度信息更新 UI。但是我一直无法弄清楚如何做到这一点。

任何人都知道如何使用 RxJava 实现这一点?

4

2 回答 2

9

onBackPressureLatest是你的朋友吗?:) http://reactivex.io/RxJava/javadoc/rx/Observable.html#onBackpressureLatest()

于 2015-08-05T07:11:13.377 回答
0

Observable.debounce听起来像你需要的。在下面的示例中,仅在每 200 毫秒窗口中来自可观察对象的最新发射将被发送给观察者。

observable
    .debounce(200, TimeUnit.MILLISECONDS)
    .subscribe(observer);
于 2015-08-04T23:12:58.467 回答