使用 RxJava 2.0 的简单解决方案,翻译自 RxJS的相同问题的答案,它结合了throttleFirst 和 debounce,然后删除重复项。
private <T> ObservableTransformer<T, T> debounceImmediate() {
return observable -> observable.publish(p ->
Observable.merge(p.throttleFirst(1, TimeUnit.SECONDS),
p.debounce(1, TimeUnit.SECONDS)).distinctUntilChanged());
}
@Test
public void testDebounceImmediate() {
Observable.just(0, 100, 200, 1500, 1600, 1800, 2000, 10000)
.flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(w -> v))
.doOnNext(v -> System.out.println(LocalDateTime.now() + " T=" + v))
.compose(debounceImmediate())
.blockingSubscribe(v -> System.out.println(LocalDateTime.now() + " Debounced: " + v));
}
使用 limit() 或 take() 的方法似乎无法处理长期存在的数据流,我可能希望持续观察,但仍会立即对一段时间内看到的第一个事件采取行动。