6

我正在创建一个简单的应用程序,用于使用 RxAndroidBle 库连接蓝牙设备(干得好!)。我遇到的是有时当我连接到设备时收到状态为 133 的 Gatt 错误。我知道它可能会发生,所以我想做的是在发生该错误时重试所有操作。这不是问题,我可以通过操作员轻松做到这一点retryWhen(),但是我还有另一个要求 - 如果连接不成功,则流必须在 30 秒后终止。我曾经timeout()这样做过,但问题是当我重试时,计时器又开始了。

所以问题是如何将 timeout() 运算符与 retryWhen() 结合起来,这样我就可以重试一些特定的错误,但保持计数器继续运行。

我有一些组合可观察对象的想法,或者一些单独的可观察对象,它们将在超时后检查连接状态,但我想知道我是否可以在单个可观察对象中做到这一点。

到目前为止,我的代码如下所示:

public Observable<ConnectingViewState> connectToDevice(String macAddress) {
        final RxBleDevice rxBleDevice = rxBleClient.getBleDevice(macAddress);
        return rxBleDevice.establishConnection(false)
                .subscribeOn(Schedulers.io())
                .map(rxBleConnection -> new ConnectingViewState.ConnectedViewState(rxBleConnection))
                .cast(ConnectingViewState.class)
                .timeout(40, TimeUnit.SECONDS)
                .startWith(new ConnectingViewState.ConnectingInProgressViewState())
                .retryWhen(errors -> errors.flatMap(error -> {
                            if (isDefaultGattError(error)) {
                                return Observable.just(new Object());
                            } else {
                                return Observable.error(error);
                            }
                        }
                ))
                .onErrorReturn(throwable -> new ConnectingViewState.ErrorState(throwable));
    }
4

2 回答 2

6

retryWhen运营商通过重新订阅其上方的运营商链来工作。由于您将您放在timeout它之前,因此会重新订阅所述超时,从而再次从头开始计数。

放置timeout 应该对retryWhen整个可重试流应用全局超时之后。

于 2017-05-04T12:42:07.863 回答
1

如前所述,我用 RxJava2 编写了一个测试。代码取自《Reactive Programming with RxJava》一书(第 257 页)

private final static int ATTEMPTS = 10;

@Test
public void name() throws Exception {
    Subject<Integer> establishConnection = PublishSubject.create();
    TestScheduler testScheduler = new TestScheduler();

    Observable<Integer> timeout = establishConnection.
            retryWhen(failures -> failures
                    .zipWith(Observable.range(1, ATTEMPTS), (err, attempt) ->
                            {
                                // check here for your error if(...)

                                if (attempt < ATTEMPTS) {
                                    long expDelay = (long) Math.pow(2, attempt - 2);
                                    return Observable.timer(expDelay, TimeUnit.SECONDS, testScheduler);
                                } else {
                                    return Observable.error(err);
                                }
                            }
                    )
                    .flatMap(x -> x))
            .timeout(30, TimeUnit.SECONDS, testScheduler)
            .onErrorResumeNext(throwable -> {
                if (throwable instanceof TimeoutException) {
                    return Observable.just(42);
                }
                return Observable.error(throwable);
            });

    TestObserver<Integer> test = timeout.test();

    testScheduler.advanceTimeBy(10, TimeUnit.SECONDS);
    establishConnection.onError(new IOException("Exception 1"));

    testScheduler.advanceTimeBy(20, TimeUnit.SECONDS);
    establishConnection.onError(new IOException("Exception 2"));

    testScheduler.advanceTimeBy(31, TimeUnit.SECONDS);

    test.assertValue(42);
}
于 2017-05-03T18:48:10.730 回答