我有一个rx.Observable
将任务进度发送到onNext()
. onNext()
排放有时会发生得如此之快,以至于无法Observer
跟上,从而导致背压。我想通过仅缓冲来自Observable
.
例如:
Observable
发出1并Observer
接收1。- 虽然
Observer
仍在处理1,但Observable
发出2、3和4。 Observer
完成处理1并开始处理4(排放2和3被丢弃)。
这似乎是在 Rx Observable 中处理进度的常见情况,因为您通常只关心使用最新进度信息更新 UI。但是我一直无法弄清楚如何做到这一点。
任何人都知道如何使用 RxJava 实现这一点?