2

我有以下测试,它使用 WireMock 来存根 HTTP 服务,该服务失败了两次,然后又成功了。我正在使用retryWhen指数退避,因此第一次重试(第二次调用)将在第一次之后 2 秒,第二次重试(第三次和成功调用)将在第二次调用后 4 秒发生。我尝试使用提前时钟,TestScheduler.advanceTimeBy但在验证onNext事件时测试失败(错误:) Expected size:<1> but was:<0>。如果我替换TestSchedulerSchedulers.computation(),一切正常。我究竟做错了什么?

@Test
public void testRetry() throws IOException {
    server.stubFor(get(urlPathEqualTo(RANDOM_JOKE_URI))
            .inScenario("Feign retry")
            .whenScenarioStateIs(STARTED)
            .willReturn(badRequest())
            .willSetStateTo("1"));

    server.stubFor(get(urlPathEqualTo(RANDOM_JOKE_URI))
            .inScenario("Feign retry")
            .whenScenarioStateIs("1")
            .willReturn(badRequest())
            .willSetStateTo("2"));

    server.stubFor(get(urlPathEqualTo(RANDOM_JOKE_URI))
            .inScenario("Feign retry")
            .whenScenarioStateIs("2")
            .willReturn(ok()
                    .withHeader("Content-Type", "application/json")
                    .withBody(copyToByteArray(getClass().getResourceAsStream("/joke.json")))));

    TestSubscriber<JokeServiceResponse> subscriber = new TestSubscriber<>();
    TestScheduler scheduler = new TestScheduler();
    // GOTCHA ALERT: must use defer for the Observable to be reevaluated
    Observable.defer(() -> client.tellAJoke()
            .toObservable())
            .retryWhen(errors -> onErrorTryAgain(errors, scheduler))
            .subscribe(subscriber);

    scheduler.advanceTimeBy(2, TimeUnit.SECONDS);
    subscriber.assertNoValues();
    scheduler.advanceTimeBy(4, TimeUnit.SECONDS);
    subscriber.assertNoValues();
    subscriber.awaitTerminalEvent(10, TimeUnit.SECONDS);
    subscriber.assertNoErrors();
    assertThat(subscriber.getOnNextEvents())
            .hasSize(1)
            .is(new Condition<JokeServiceResponse>() {
                @Override
                public boolean matches(JokeServiceResponse response) {
                    assertResponse(response);
                    return true;
                }
            }, atIndex(0));
}

private Observable<?> onErrorTryAgain(Observable<? extends Throwable> errors, Scheduler scheduler) {
    return errors
            .zipWith(Observable.range(1, 5), (t, i) -> new SimpleImmutableEntry<Integer, Throwable>(i, t))
            .flatMap(e -> {
                // Can inspect Throwable here and decide whether to retry or return Observable.error
                return Observable.timer((long) Math.pow(2, e.getKey()),
                        TimeUnit.SECONDS, scheduler);
            });
}
4

0 回答 0