为什么不将CompletionStage
s 组合起来,然后将它们提供给 Akka 流?
Source<Pair<String,String>, ?> execute() {
CompletionStage<Pair<String, List<String>>> pairCompletionStage = getSingleElement().thenCombine(getListOfElements(), Pair::create);
return Source.completionStage(pairCompletionStage)
.flatMapConcat(pair -> Source.from(pair.second()).map(listElement -> Pair.create(listElement, pair.first())));
}
完整的 PoC - 使用睡眠超时先完成一个或另一个CompletionStage
:
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import akka.Done;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
public class CompletionStages {
CompletionStage<String> getSingleElement() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "Single Element";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
});
}
CompletionStage<List<String>> getListOfElements() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
return Arrays.asList("One", "Two", "Three");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
});
}
Source<Pair<String,String>, ?> execute() {
CompletionStage<Pair<String, List<String>>> pairCompletionStage = getSingleElement().thenCombine(getListOfElements(), Pair::create);
return Source.completionStage(pairCompletionStage)
.flatMapConcat(pair -> Source.from(pair.second()).map(listElement -> Pair.create(listElement, pair.first())));
}
CompletionStage<Done> run(ActorSystem system) {
return execute().runWith(Sink.foreach(System.out::println), system);
}
public static void main(String... args) {
final ActorSystem system = ActorSystem.create();
new CompletionStages().run(system)
.thenRun(system::terminate);
}
}