我有一个在 RxJava 中建模的相互依赖的异步操作图。对于某些错误,应重新运行整个图形。retry(..)
运营商不直接支持这一点,因为任何错误都会呈现给所有订阅者。由于retry(..)
操作员只是重新订阅,他们总是从最终的 observable 中得到错误,只计算一次。即重新订阅时不会再次执行该工作。
我尝试创建一个特殊的 observable,它为每个订阅调用一个 observable 生成方法。在这种情况下,重试操作符主要按预期工作,并且在一些额外的缓存操作之后,完全按预期工作。
然而,这似乎很常见,以至于我怀疑我在重复 RxJava 中已经提供的工作。我还担心我的解决方案的健壮性,因为我正在尝试在低级别做一些事情,可能没有足够的 RxJava 知识来这样做。另一个问题是可组合性:要支持所有三种retry(..)
形式,我需要三个版本的包装方法。
下面的演示解释了我正在尝试做的事情以及到目前为止的成功。
在 RxJava 中是否有更简单或更惯用(或两者兼有)的方法来进行这种重试?
package demo;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Func0;
import rx.util.async.Async;
/**
** <p>
* Demonstrate attempts to get RxJava retry for asynchronous work chain. The use
* case that exposed this problem is reading and writing data with versioning
* for optimistic concurrency. The work is a series of async I/O operations that
* must be re-assembled from scratch if a stale version is detected on write.
* </p>
*
* <p>
* Four cases are demonstrated in this class:
* </p>
* <ul>
* <li>Case 1: perform the work and naiively apply a retry operator to the
* asynchronous work. This fails because the work itself is not retried on
* re-subscribe.</li>
* <li>Case 2: wrap the work in an observer that performs it on every
* subscription. A retry operator applied to the wrapper correctly re-attempts
* the work on failure. However, every subsequent subscriber to the result
* causes the work to be performed again.</li>
* <li>Case 3: Apply the cache operator to the result of the retry operator.
* This performs as desired.</li>
* <li>Case 4: Generalize the approach of case 3 and encapsulate it in an
* observable generator method. This shows that it is difficult to generalize
* this behavior because each retry operator form (number, predicate, perpetual)
* will require its own generator method.</li>
* </ul>
*
* <p>
* NOTE: this code does not work if compiled by the Eclipse (Keppler) compiler
* for Java 8. I have to compile with javac for it to work. There is some
* problem with Lambda class naming in the code generated by Eclipse.
* </p>
*
*
*/
public class AsyncRetryDemo {
public static void main(final String[] args) throws Exception {
new AsyncRetryDemo().case1();
new AsyncRetryDemo().case2();
new AsyncRetryDemo().case3();
new AsyncRetryDemo().case4();
// output is:
//
// case 1, sub 1: fail (max retries, called=1)
// case 1, sub 2: fail (max retries, called=1)
// case 2, sub 1: pass (called=2)
// case 2, sub 2: fail (called=3)
// case 3, sub 1: pass (called=2)
// case 3, sub 2: pass (called=2)
// case 4, sub 1: pass (called=2)
// case 4, sub 2: pass (called=2)
}
private final AtomicInteger called = new AtomicInteger();
private final CountDownLatch done = new CountDownLatch(2);
/**
* This represents a sequence of interdependent asynchronous operations that
* might fail in a way that prescribes a retry (but in this case, all we are
* doing is squaring an integer asynchronously and failing the first time).
*
* @param input
* to the process.
*
* @return promise to perform the work and produce either a result or a
* suggestion to retry (e.g. a stale version error).
*/
private Observable<Integer> canBeRetried(final int a) {
final Observable<Integer> rval;
if (this.called.getAndIncrement() == 0) {
rval = Observable.error(new RuntimeException(
"we always fail the first time"));
} else {
rval = Async.start(() -> a * a);
}
return rval;
}
private void case1() throws InterruptedException {
/*
* In this case, we invoke the observable-creator to get the async
* promise. Of course, if it fails, any retry will fail as well because
* the failed result is computed one time and pushed to all subscribers
* forever.
*
* Thus this case fails because the first invocation of canBeRetried(..)
* always fails.
*/
final Observable<Integer> o = canBeRetried(2)
.retry(2);
check("case 1", o);
this.done.await();
}
private void case2() throws InterruptedException {
/*
* In this case, we wrap canBeRetried(..) inside an observer that
* invokes it on every subscription. So, we get past the retry problem.
* But every new subscriber after the retry succeeds causes the work to
* restart.
*/
final Observable<Integer> o = Observable.create(
new OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> child) {
canBeRetried(2).subscribe(child);
}
})
.retry(2);
check("case 2", o);
this.done.await();
}
private void case3() throws InterruptedException {
/*
* In this case, we wrap canBeRetried(..) inside an observer that
* invokes it on every subscription. So, we get past the retry problem.
* We cache the result of the retry to solve the extra work problem.
*/
final Observable<Integer> o = Observable.create(
new OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> child) {
canBeRetried(2).subscribe(child);
}
})
.retry(2)
.cache();
check("case 3", o);
this.done.await();
}
private void case4() throws InterruptedException {
/*
* Same as case 3 but we use the retryAndCache(..) to do the work for
* us.
*/
final Observable<Integer> o = retryAndCache(() -> canBeRetried(2), 2);
check("case 4", o);
this.done.await();
}
private void check(final String label, final Observable<Integer> promise) {
// does the work get retried on failure?
promise.subscribe(
v -> {
System.out.println(label + ", sub 1: "
+ (this.called.get() == 2 ? "pass" : "fail")
+ " (called=" + this.called.get() + ")");
},
x -> {
System.out.println(label
+ ", sub 1: fail (max retries, called="
+ this.called.get() + ")");
this.done.countDown();
}, () -> {
this.done.countDown();
});
// do subsequent subscribers avoid invoking the work again?
promise.subscribe(
v -> {
System.out.println(label + ", sub 2: "
+ (this.called.get() == 2 ? "pass" : "fail")
+ " (called=" + this.called.get() + ")");
},
x -> {
System.out.println(label
+ ", sub 2: fail (max retries, called="
+ this.called.get() + ")");
this.done.countDown();
}, () -> {
this.done.countDown();
});
}
/**
* Generalized retry and cache for case 4.
*
* @param binder
* user-provided supplier that assembles and starts the
* asynchronous work.
*
* @param retries
* number of times to retry on error.
*
* @return promise to perform the work and retry up to retry times on error.
*/
private static <R> Observable<R> retryAndCache(
final Func0<Observable<R>> binder, final int retries) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(final Subscriber<? super R> child) {
binder.call().subscribe(child);
}
})
.retry(retries)
.cache();
}
}