0

我试图了解如何repeatWhen在 RxJava 中使用。javadoc令人困惑,当我在网上搜索时有人建议使用如下

MyRepeatFunction myRepeatFunction = new MyRepeatFunction(3);
observable1.repeatWhen(myRepeatFunction).subscribe((t) -> System.out.print(t));
class MyRepeatFunction implements Function<Observable<Object>, ObservableSource<Object>> {
    private int repeatCount;
    
    public MyRepeatFunction(int repeatCount) {
        this.repeatCount = repeatCount;
    }
    
    @Override
    public @NonNull ObservableSource<Object> apply(@NonNull Observable<Object> t) throws Throwable {
        return t.delay(1, TimeUnit.SECONDS);
    }
}

该代码return t.delay(1, TimeUnit.SECONDS);将使其永远持续下去。在主线程停止之前它不会停止。我想重复可观察但仅重复次数或直到特定不正确。

我很困惑。帮助表示赞赏。

4

1 回答 1

0

那这个呢:

handler – 接收一个 Observable 通知,用户可以使用这些通知完成或出错,中止重试

你会得到一个注入到 -operator 中的 observable ,它会从源retryWhen中发出每个错误。onError你可以决定下一步该做什么。在 中发出一个值,任何值,都会导致从底部 ( ) 到顶部retryWhen的重新订阅。retryWhen

该示例显示,您可以在内部使用 throwable-observableretryWhen并应用 take-operator 以限制重新订阅。此外,为了完成信号的来源,您可以使用takeUntil,这需要另一个可观察的。takeUntil将发送一个onComplete下游并且订阅将完成。RxJava 将负责其余的工作,这意味着开放订阅(重试)将被取消。

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.schedulers.TestScheduler;
import io.reactivex.rxjava3.subjects.PublishSubject;
import org.junit.jupiter.api.Test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

class So65154637 {
  @Test
  void main() {
    PublishSubject<String> shouldFinish$ = PublishSubject.create();
    TestScheduler testScheduler = new TestScheduler();

    AtomicInteger count = new AtomicInteger(0);

    Observable<Object> obs =
        Observable.fromAction(
            () -> {
              System.out.println("inc");
              count.incrementAndGet();
              throw new RuntimeException("woat");
            })
        .retryWhen(
            throwableObservable ->
                throwableObservable.take(10).delay(1L, TimeUnit.SECONDS, testScheduler))
        .takeUntil(shouldFinish$);

    TestObserver<Object> test = obs.test();

    testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);

    shouldFinish$.onNext("");

    assertThat(count.get()).isEqualTo(3);
    test.assertComplete();
  }
}
于 2020-12-05T21:28:08.693 回答