0

我的应用程序通常必须做两件事:

  • 一次只接受一个网络请求
  • 如果请求失败重试

这就是我实现它的方式:

public class RequestsLocker {

    private volatile boolean isLocked;

    public <T> Observable.Transformer<T, T> applyLocker() {
        if(!isLocked()) {
            return observable -> observable
                    .doOnSubscribe(() -> {
                        lockChannel();
                    })
                    .doOnUnsubscribe(() -> {
                        freeChannel();
                    });
        } else {
            return observable -> Observable.error(new ChannelBusyException("Channel is busy now."));
        }
    }

    private void lockChannel() {
        isLocked = true;
    }

    private void freeChannel() {
        isLocked = false;
    }

    public boolean isLocked() {
        return isLocked;
    }

}

看起来不错。

现在我的retryWhen实现:

public static Observable<?> retryWhenAnyIoExceptionWithDelay(Observable<? extends Throwable> observable) {
    return observable.flatMap(error -> {
        // For IOExceptions, we  retry
        if (error instanceof IOException) {
            return Observable.timer(2, TimeUnit.SECONDS);
        }

        // For anything else, don't retry
        return Observable.error(error);
    });
}

我是如何使用它的:

public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
    return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes))
            .compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
}

...

public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
                             String notes, Subscriber<List<QueueCarItem>> subscriber) {
    queueApiMediator.finishService(carItem.getId(), paymentType, notes)
            .subscribeOn(ioScheduler)
            .observeOn(uiScheduler)
            .doOnError(this::handleError)
            .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
            .subscribe(subscriber);
}

doOnUnsubscribe()调用任何错误然后储物柜对任何新请求打开的主要问题,直到计时器到期并再次发生重新订阅。那就是问题所在。当计时器计时,用户可以提出另一个请求。

我该如何解决?

4

3 回答 3

2

问题是你正在将你的转换器应用到源 observable 上,即在你的retrywhen. 当出现错误时,您总是要取消订阅然后重新订阅导致您doOnUnsubscribe被调用的源 observable。

我建议你试试

public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
    return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes));            
}


public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
                             String notes, Subscriber<List<QueueCarItem>> subscriber) {
    queueApiMediator.finishService(carItem.getId(), paymentType, notes)
            .subscribeOn(ioScheduler)
            .observeOn(uiScheduler)
            .doOnError(this::handleError)
            .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
            .compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
            .subscribe(subscriber);
}

PS:应用储物柜转换器看起来有点不同,即它在您链接的代码中没有参数。

于 2016-07-11T14:25:25.523 回答
1

使用 retryWhen,为避免取消订阅 onError,您必须使用不会取消订阅的onErrorResumeNext

看看这个例子

/**
 * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
 */
@Test
public void observableOnErrorResumeNext() {
    Subscription subscription = Observable.just(null)
                                          .map(Object::toString)
                                          .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                                          .retryWhen(errors -> errors.doOnNext(o -> count++)
                                                                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null)),
                                                     Schedulers.newThread())
                                          .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world for extinction!");
                                          })
                                          .subscribe(s -> System.out.println(s));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}

另外关于并发,如果在 flatMap 操作符中进行操作,可以指定 Max concurrent。

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
    if (getClass() == ScalarSynchronousObservable.class) {
        return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
    }
    return merge(map(func), maxConcurrent);
}

您可以在此处查看更多示例https://github.com/politrons/reactive

于 2016-07-11T08:07:13.647 回答
0

我当前的解决方案不是在 IoException 上解锁 RequestLocker,因为在这种情况下,请求将在延迟后重复。

public <T> Observable.Transformer<T, T> applyLocker() {
    if(!isLocked()) {
        return observable -> observable.doOnSubscribe(() -> {
            lockChannel();
        }).doOnNext(obj -> {
            freeChannel();
        }).doOnError(throwable -> {
            if(throwable instanceof IOException) {
                return; // as any request will be repeated in case of IOException
            }
            freeChannel(channel);
        });
    } else {
        return observable -> Observable.error(new ChannelBusyException("Channel is busy now"));
    }
}
于 2016-07-11T10:01:19.207 回答