我正在试验 RxJava 和 Java 8 的 CompletableFuture 类,但不太了解如何处理超时条件。
import static net.javacrumbs.futureconverter.java8rx.FutureConverter.toObservable;
// ...
Observable<String> doSomethingSlowly() {
CompletableFuture<PaymentResult> task = CompletableFuture.supplyAsync(() -> {
// this call may be very slow - if it takes too long,
// we want to time out and cancel it.
return processor.slowExternalCall();
});
return toObservable(task);
}
// ...
doSomethingSlowly()
.single()
.timeout(3, TimeUnit.SECONDS, Observable.just("timeout"));
这基本上有效(如果达到三秒的超时,则发布“超时”)。但是,我还想取消我已经包装的未来任务Observable
- 以 RxJava 为中心的方法可能吗?
我知道一种选择是自己处理超时,使用task.get(3, TimeUnit.SECONDS)
,但我想知道是否可以在 RxJava 中完成所有任务处理。