44

考虑以下用例:

  • 需要尽快交付第一件物品
  • 需要以1 秒的超时时间去抖动以下事件

我最终实现了自定义运算符,OperatorDebounceWithTime然后像这样使用它

.lift(new CustomOperatorDebounceWithTime<>(1, TimeUnit.SECONDS, Schedulers.computation()))

CustomOperatorDebounceWithTime立即交付第一个项目,然后使用OperatorDebounceWithTime操作员的逻辑去抖动后面的项目。

有没有更简单的方法来实现所描述的行为?让我们跳过compose运算符,它不能解决问题。我正在寻找一种在不实现自定义运算符的情况下实现这一目标的方法。

4

14 回答 14

40

更新:
根据@lopar 的评论,更好的方法是:

Observable.from(items).publish(publishedItems -> publishedItems.limit(1).concatWith(publishedItems.skip(1).debounce(1, TimeUnit.SECONDS)))

像这样的工作:

String[] items = {"one", "two", "three", "four", "five", "six", "seven", "eight"};
Observable<String> myObservable = Observable.from(items);
Observable.concat(myObservable.first(), myObservable.skip(1).debounce(1, TimeUnit.SECONDS))
    .subscribe(s -> System.out.println(s));
于 2015-05-09T22:42:42.990 回答
18

@LortRaydenMK 和 @lopar 的答案是最好的,但我想提出其他建议,以防它碰巧对您或处于类似情况的人更有效。

有一个变体debounce()需要一个函数来决定这个特定项目去抖动多长时间。它通过返回一个在一段时间后完成的 observable 来指定这一点。您的函数可以返回empty()第一项和timer()其余的。类似(未经测试):

String[] items = {"one", "two", "three", "four", "five", "six"};
Observable.from(items)
    .debounce(item -> item.equals("one")
            ? Observable.empty()
            : Observable.timer(1, TimeUnit.SECONDS));

诀窍是这个函数必须知道哪个项目是第一个。你的序列可能知道这一点。如果没有,你可能不得不zip()使用range()或其他东西。在这种情况下,最好在另一个答案中使用该解决方案。

于 2015-10-07T23:26:55.370 回答
10

使用 RxJava 2.0 的简单解决方案,翻译自 RxJS的相同问题的答案,它结合了throttleFirst 和 debounce,然后删除重复项。

private <T> ObservableTransformer<T, T> debounceImmediate() {
    return observable  -> observable.publish(p -> 
        Observable.merge(p.throttleFirst(1, TimeUnit.SECONDS), 
            p.debounce(1, TimeUnit.SECONDS)).distinctUntilChanged());
} 

@Test
public void testDebounceImmediate() {
    Observable.just(0, 100, 200, 1500, 1600, 1800, 2000, 10000)
        .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(w -> v))
        .doOnNext(v -> System.out.println(LocalDateTime.now() + " T=" + v))
            .compose(debounceImmediate())
            .blockingSubscribe(v -> System.out.println(LocalDateTime.now() + " Debounced: " + v));
}

使用 limit() 或 take() 的方法似乎无法处理长期存在的数据流,我可能希望持续观察,但仍会立即对一段时间内看到的第一个事件采取行动。

于 2017-03-08T07:53:51.310 回答
8

LordRaydenMK和 lopar 的回答有一个问题:你总是丢失第二项。我想以前没有人意识到这一点,因为如果你有一个去抖动,你通常会有很多事件,而第二个事件无论如何都会被去抖动。永不丢失事件的正确方法是:

observable
    .publish(published ->
        published
            .limit(1)
            .concatWith(published.debounce(1, TimeUnit.SECONDS)));

别担心,你不会得到任何重复的事件。如果您不确定,您可以运行此代码并自行检查:

Observable.just(1, 2, 3, 4)
    .publish(published ->
        published
            .limit(1)
            .concatWith(published))
    .subscribe(System.out::println);
于 2018-03-26T17:38:41.013 回答
7

使用debounce带有函数的版本并以这种方式实现该函数:

    .debounce(new Func1<String, Observable<String>>() {
        private AtomicBoolean isFirstEmission = new AtomicBoolean(true);
        @Override
        public Observable<String> call(String s) {
             // note: standard debounce causes the first item to be
             // delayed by 1 second unnecessarily, this is a workaround
             if (isFirstEmission.getAndSet(false)) {
                 return Observable.just(s);
             } else {
                 return Observable.just(s).delay(1, TimeUnit.SECONDS);
             }
        }
    })

第一项立即发出。后续项目延迟一秒。如果延迟的 observable 在下一个项目到达之前没有终止,它会被取消,所以预期的去抖动行为就实现了。

于 2017-03-11T00:42:22.433 回答
7

基于@lopar 评论的 Kotlin 扩展函数:

fun <T> Flowable<T>.debounceImmediate(timeout: Long, unit: TimeUnit): Flowable<T> {
    return publish {
        it.take(1).concatWith(it.debounce(timeout, unit))
    }
}

fun <T> Observable<T>.debounceImmediate(timeout: Long, unit: TimeUnit): Observable<T> {
    return publish {
        it.take(1).concatWith(it.debounce(timeout, unit))
    }
}
于 2019-06-06T14:10:08.350 回答
2

Ngrx - rxjs 解决方案,将管道一分为二

onMyAction$ = this.actions$
    .pipe(ofType<any>(ActionTypes.MY_ACTION);

lastTime = new Date();

@Effect()
onMyActionWithAbort$ = this.onMyAction$
    .pipe(
        filter((data) => { 
          const result = new Date() - this.lastTime > 200; 
          this.lastTime = new Date(); 
          return result; 
        }),
        switchMap(this.DoTheJob.bind(this))
    );

@Effect()
onMyActionWithDebounce$ = this.onMyAction$
    .pipe(
        debounceTime(200),
        filter(this.preventDuplicateFilter.bind(this)),
        switchMap(this.DoTheJob.bind(this))
    );
于 2018-08-06T10:59:27.037 回答
1

我对 Dart 的解决方案:

extension StreamExt<T> on Stream<T> {
  Stream<T> immediateDebounce(Duration duration) {
    var lastEmit = 0;
    return debounce((event) {
      if (_now - lastEmit < duration.inMilliseconds) {
        lastEmit = _now;
        return Stream.value(event).delay(duration);
      } else {
        lastEmit = _now;
        return Stream.value(event);
      }
    });
  }
}

int get _now =>  DateTime.now().millisecondsSinceEpoch;
于 2020-05-15T19:17:17.880 回答
1

为了防止使用这个进行双重订阅:

    const debouncedSkipFirstStream$ = stream$.pipe(
        map((it, index) => ({ it, index })),
        debounce(({ index }) => (
            index ? new Promise(res => setTimeout(res, TimeUnit.SECONDS))
                : Rx.of(true))),
        map(({ it }) => it),
    );

如果使用拆分解决方案,您将看到“运行”打印两次

x = rxjs.Observable.create(o=>{
    console.info('run');
    o.next(1);
    o.next(2);
});
a = x.pipe(rxjs.operators.take(1));
b = x.pipe(rxjs.operators.skip(1), rxjs.operators.debounceTime(60));
rxjs.concat(a, b).subscribe(console.log);
于 2020-03-12T07:03:56.567 回答
1

我和

Flowable.concat(

    flowable // emits immediately
        .take(1)
        .skipWhile { it.isEmpty() },

    flowable // same flowable, but emits with delay and debounce
        .debounce(2, TimeUnit.SECONDS)
)
    .distinctUntilChanged()
于 2020-06-30T10:53:25.580 回答
1

如果有人在 2021 年寻找这个:

@OptIn(FlowPreview::class)
fun <T> Flow<T>.debounceImmediate(timeMillis: Long): Flow<T> =
    withIndex()
        .onEach { if (it.index != 0) delay(timeMillis) }
        .map { it.value }

用法:

authRepository.login(loginDto)
                    .debounceImmediate(10000)
于 2021-09-14T22:35:29.940 回答
1

阅读完这篇文章后,我最终使用throttleLatest运算符获得了与我正在寻找的立即去抖动非常相似的行为。

油门最新弹珠图

以下代码将立即发出第一个项目,然后每 500 毫秒检查一次新项目。只有在该 500 毫秒窗口内收到的最新事件才会被发送出去。

observable.throttleLatest(500, TimeUnit.MILLISECONDS)
于 2021-09-16T13:17:27.973 回答
0

对于那些试图使用 Kotlin Flow 解决相同问题的人:

fun <T> Flow<T>.throttleFirst(timeout: Duration): Flow<T> {
    var job = Job().apply { complete() }
    return onCompletion { job.cancel() }.run {
        flow {
            coroutineScope {
                collect { value ->
                    if (!job.isActive) {
                        emit(value)
                        job = launch { delay(timeout.inWholeMilliseconds) }
                    }
                }
            }
        }
    }
}

例子:

flow {
    emit(1)
    delay(90)
    emit(2)
    delay(90)
    emit(3)
    delay(1010)
    emit(4)
    delay(1010)
    emit(5)
}.throttleFirst(1.seconds).collect { ... }
// 1, 4, 5
于 2021-12-07T22:26:26.317 回答
0

   view.clicks()
            .throttleFirst(2, TimeUnit.SECONDS)
            .subscribe {
                println("Clicked button")
            }

我发现这是最简单的方法。clicks() 来自 rx 视图绑定。添加此依赖项以获取可观察的视图

 implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0
于 2021-11-10T09:33:42.260 回答