2

我正在使用带有微服务框架的akka​​,所以我有很多完成阶段的请求。我想从一个微服务中获取一个元素列表,并将它们与另一个微服务中的单个元素一起压缩,这样我最终得到一个 Pair<list item, single element> 的来源。

我不能用普通的 zip 来做到这一点,因为 Source.zip 会在两个源之一完成后立即完成,所以我最终只传播了一个元素。

我不能使用 Source.zipAll,因为这需要我提前定义默认元素。

如果我提前拥有单个元素,我可以使用 Source.repeat 使其重复传播该元素,这意味着 Source.zip 将在元素列表完成时完成,但 Source.repeat 不能完成阶段或 Source.completionStage。

我目前的策略是在 mapConcat 列表元素之前将这些东西压缩在一起。

Source<singleElement> singleElement = Source.completionStage(oneService.getSingleElement().invoke());

return Source.completionStage(anotherService.getListOfElements().invoke)
    .zip(singleElement)
    .flatMapConcat(pair -> Source.fromIterator(() -> pair.first().stream().map(listElement -> Pair.create(listElement, pair.second())));

这最终达到了我想要的,但我觉得有很多不必要的重复和同步移动数据。有没有更好的方法来解决我缺少的这个问题?

4

2 回答 2

3

flatMapConcat运算符应该允许您构造 a ,Source.repeat它一旦知道就会重复单个元素。在 Scala 中(Source.future相当于 Scala Source.completionStage:我对 Java lambda 语法不够熟悉,无法用 Java 回答):

val singleElement = Source.future(oneService.getSingleElement)

Source.future(anotherService.getListOfElements)
  .mapConcat(lst => lst)  // unspool the list
  .zip(singleElement.flatMapConcat(element => Source.repeat(element)))
于 2021-01-23T22:05:41.097 回答
1

为什么不将CompletionStages 组合起来,然后将它们提供给 Akka 流?

Source<Pair<String,String>, ?> execute() {
    CompletionStage<Pair<String, List<String>>> pairCompletionStage = getSingleElement().thenCombine(getListOfElements(), Pair::create);

    return Source.completionStage(pairCompletionStage)
        .flatMapConcat(pair -> Source.from(pair.second()).map(listElement -> Pair.create(listElement, pair.first())));
}

完整的 PoC - 使用睡眠超时先完成一个或另一个CompletionStage

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import akka.Done;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

public class CompletionStages {
    CompletionStage<String> getSingleElement() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                return "Single Element";
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        });
    }

    CompletionStage<List<String>> getListOfElements() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                return Arrays.asList("One", "Two", "Three");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        });
    }

    Source<Pair<String,String>, ?> execute() {
        CompletionStage<Pair<String, List<String>>> pairCompletionStage = getSingleElement().thenCombine(getListOfElements(), Pair::create);

        return Source.completionStage(pairCompletionStage)
                .flatMapConcat(pair -> Source.from(pair.second()).map(listElement -> Pair.create(listElement, pair.first())));
    }

    CompletionStage<Done> run(ActorSystem system) {
        return execute().runWith(Sink.foreach(System.out::println), system);
    }

    public static void main(String... args) {
        final ActorSystem system = ActorSystem.create();
        new CompletionStages().run(system)
                .thenRun(system::terminate);
    }
}
于 2021-01-23T00:05:17.177 回答