0

我的服务正在与 couchbase 合作。当我想更新文档时,首先该服务执行查找并使用 CAS 获取文档,然后更新文档。如果更新因 CASMismatch 异常而失败,我想在查找(异步请求)上执行延迟重试(重试时间)并更新。问题是重试仅调用可观察的更新,而不是整个查找和更新异步请求。

这是重试代码:

`    public Func1<Observable<? extends Throwable>, Observable<?>> getRetryOnCASMismatchExceptionHandler(String unrecoverableErrorMessage) {
        return observable -> observable
                .zipWith(Observable.range(1, maxAttempts), ImmutablePair::of)
                .flatMap(pair -> {
                    var throwable = pair.left;
                    var attemptsCounter = pair.right;
                    if (throwable instanceof CASMismatchException) {
                        // Retry code
                        return Observable.timer((long) attemptsCounter * backoffMs, TimeUnit.MILLISECONDS);
                    }
                    // Pass the throwable
                    return Observable.error(new RuntimeException(unrecoverableErrorMessage, throwable));
                });
    }`

更新代码:

     private Observable<String> updateDetectionWithRetry(DetectionFeed detectionFeed, String userId, String detectionPath) 
     {
            updateDetection(detectionFeed, userId, detectionPath)
            .retryWhen(retryHandlers.getRetryOnCASMismatchExceptionHandler("Failed to update persisted UserAccount with detection data [" + detectionFeed.toString() + "]"));
     }
    
     private Observable<String> updateDetection(DetectionFeed detectionFeed, String userId, String detectionPath) 
     {
         return userRepo
             .lookupDetection(userId, detectionPath)
             .filter(existingDetection -> isNewDetectionFeed(existingDetection, detectionFeed))
             .flatMap(detectionToPersist -> userRepo.replaceDetection(userId, detectionPath,
                      createDetectionToPersist(detectionFeed), detectionToPersist.getCas()))
     }
4

1 回答 1

0
Observable.defer(() -> updateDetection(detectionFeed, userId, detectionPath))

Observable.defer() 将方法包装在 observable 中,每个错误都会重试所有过程(查找和替换)

这是完整的正确代码:

private Single<Optional<String>> updateUserAccountDetection(DetectionFeed detectionFeed, String userId, String detectionPath) {
    return Observable.defer(() -> updateDetection(detectionFeed, userId, detectionPath))
            .retryWhen(RetryBuilder
                    .anyOf(CASMismatchException.class)
                    .delay(Delay.fixed(1L, TimeUnit.SECONDS))
                    .max(3)
                    .build())
            .map(Optional::ofNullable)
            .toSingle();
}
于 2021-09-14T10:38:12.960 回答