我的服务正在与 couchbase 合作。当我想更新文档时,首先该服务执行查找并使用 CAS 获取文档,然后更新文档。如果更新因 CASMismatch 异常而失败,我想在查找(异步请求)上执行延迟重试(重试时间)并更新。问题是重试仅调用可观察的更新,而不是整个查找和更新异步请求。
这是重试代码:
` public Func1<Observable<? extends Throwable>, Observable<?>> getRetryOnCASMismatchExceptionHandler(String unrecoverableErrorMessage) {
return observable -> observable
.zipWith(Observable.range(1, maxAttempts), ImmutablePair::of)
.flatMap(pair -> {
var throwable = pair.left;
var attemptsCounter = pair.right;
if (throwable instanceof CASMismatchException) {
// Retry code
return Observable.timer((long) attemptsCounter * backoffMs, TimeUnit.MILLISECONDS);
}
// Pass the throwable
return Observable.error(new RuntimeException(unrecoverableErrorMessage, throwable));
});
}`
更新代码:
private Observable<String> updateDetectionWithRetry(DetectionFeed detectionFeed, String userId, String detectionPath)
{
updateDetection(detectionFeed, userId, detectionPath)
.retryWhen(retryHandlers.getRetryOnCASMismatchExceptionHandler("Failed to update persisted UserAccount with detection data [" + detectionFeed.toString() + "]"));
}
private Observable<String> updateDetection(DetectionFeed detectionFeed, String userId, String detectionPath)
{
return userRepo
.lookupDetection(userId, detectionPath)
.filter(existingDetection -> isNewDetectionFeed(existingDetection, detectionFeed))
.flatMap(detectionToPersist -> userRepo.replaceDetection(userId, detectionPath,
createDetectionToPersist(detectionFeed), detectionToPersist.getCas()))
}