嗯,这是框架应该支持的方法。首先,我认为CompletionStage.applyToEither做了类似的事情,但事实证明它没有。所以我想出了这个解决方案:
public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) {
final int count = stages.size();
if (count <= 0) {
throw new IllegalArgumentException("stages must not be empty");
}
final AtomicInteger settled = new AtomicInteger();
final CompletableFuture<U> future = new CompletableFuture<U>();
BiConsumer<U, Throwable> consumer = (val, exc) -> {
if (exc == null) {
future.complete(val);
} else {
if (settled.incrementAndGet() >= count) {
// Complete with the last exception. You can aggregate all the exceptions if you wish.
future.completeExceptionally(exc);
}
}
};
for (CompletionStage<U> item : stages) {
item.whenComplete(consumer);
}
return future;
}
要查看它的实际效果,这里有一些用法:
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
public class Main {
public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) {
final int count = stages.size();
if (count <= 0) {
throw new IllegalArgumentException("stages must not be empty");
}
final AtomicInteger settled = new AtomicInteger();
final CompletableFuture<U> future = new CompletableFuture<U>();
BiConsumer<U, Throwable> consumer = (val, exc) -> {
if (exc == null) {
future.complete(val);
} else {
if (settled.incrementAndGet() >= count) {
// Complete with the last exception. You can aggregate all the exceptions if you wish.
future.completeExceptionally(exc);
}
}
};
for (CompletionStage<U> item : stages) {
item.whenComplete(consumer);
}
return future;
}
private static final ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor();
public static <U> CompletionStage<U> delayed(final U value, long delay) {
CompletableFuture<U> future = new CompletableFuture<U>();
worker.schedule(() -> {
future.complete(value);
}, delay, TimeUnit.MILLISECONDS);
return future;
}
public static <U> CompletionStage<U> delayedExceptionally(final Throwable value, long delay) {
CompletableFuture<U> future = new CompletableFuture<U>();
worker.schedule(() -> {
future.completeExceptionally(value);
}, delay, TimeUnit.MILLISECONDS);
return future;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("Started...");
/*
// Looks like applyToEither doesn't work as expected
CompletableFuture<Integer> a = CompletableFuture.completedFuture(99);
CompletableFuture<Integer> b = Main.<Integer>completedExceptionally(new Exception("Exc")).toCompletableFuture();
System.out.println(b.applyToEither(a, x -> x).get()); // throws Exc
*/
try {
List<CompletionStage<Integer>> futures = new ArrayList<>();
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #1"), 100));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #2"), 200));
futures.add(delayed(1, 1000));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #4"), 400));
futures.add(delayed(2, 500));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #5"), 600));
Integer value = firstCompleted(futures).toCompletableFuture().get();
System.out.println("Completed normally: " + value);
} catch (Exception ex) {
System.out.println("Completed exceptionally");
ex.printStackTrace();
}
try {
List<CompletionStage<Integer>> futures = new ArrayList<>();
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#1"), 400));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#2"), 200));
Integer value = firstCompleted(futures).toCompletableFuture().get();
System.out.println("Completed normally: " + value);
} catch (Exception ex) {
System.out.println("Completed exceptionally");
ex.printStackTrace();
}
System.out.println("End...");
}
}