0

我有一个数据服务,我正在认真考虑切换到反应模型。这是一个联合查询引擎,可以通过调用一个或多个“解析器”实现来解析查询数据,具体取决于查询类型。

如果我切换到spring-data-mongodb-reactive,那么这些实现中的每一个都必须为以下各项创建多个Flux实例:

  1. 对信息不同部分的查询
  2. 从 #1 查询每个查询的所有数据库

注意:我不想合并每个Flux,因为能够将上面 #1 的查询分开,使最终处理更容易。将所有联合数据库的每个“部分”查询组合起来就可以了,但我必须将每个“部分”的数据分开。我希望这是有道理的。

解释完整的工作流程超出了本文的范围,但我想知道如何创建任意数量的Flux实例,并订阅它们以启动它们,然后等到它们全部完成后再继续处理完整的- 跨所有联合来源检索数据。在 Java 中,我正在寻找类似于CompletableFuture.allOf().

如果我做这样的事情,我是否接近于正确的轨道:

public class ReactiveDataService {
    private static final Supplier<Example<String>> example1 = () -> Example.of("Example 1");
    private static final Supplier<Example<String>> example2 = () -> Example.of("Example 2");
    private static final Supplier<Example<String>> example3 = () -> Example.of("Example 3");
    private static final Supplier<Example<String>> example4 = () -> Example.of("Example 4");
    private static final Supplier<Example<String>> example5 = () -> Example.of("Example 5");
    private final Collection<ReactiveMongoRepository<String, String>> repositories;

    public ReactiveDataService(Collection<ReactiveMongoRepository<String, String>> repositories) {
        this.repositories = repositories;
    }

    private void processFluxes(final Flux<String> flux1, final Flux<String> flux2, final Flux<String> flux3,
                               final Flux<String> flux4, final Flux<String> flux5) {
        // Call service to process flux stuff
    }

    /**
     * For all repositories, combine fluxes that run the same query.
     * Subscribe to each flux immediately to get the query started.
     * Add all fluxes to a container flux that processes the results
     * upon completion.
     * After everything is set up, block until completion.
     */
    public void doQuery() {
        final Flux<String> flux1 = Flux.fromIterable(repositories)
                .flatMap(repo -> repo.findAll(example1.get()));
        flux1.subscribe();

        final Flux<String> flux2 = Flux.fromIterable(repositories)
                .flatMap(repo -> repo.findAll(example2.get()));
        flux2.subscribe();

        final Flux<String> flux3 = Flux.fromIterable(repositories)
                .flatMap(repo -> repo.findAll(example3.get()));
        flux3.subscribe();

        final Flux<String> flux4 = Flux.fromIterable(repositories)
                .flatMap(repo -> repo.findAll(example4.get()));
        flux4.subscribe();

        final Flux<String> flux5 = Flux.fromIterable(repositories)
                .flatMap(repo -> repo.findAll(example5.get()));
        flux5.subscribe();

        final Flux<Flux<String>> fluxes = Flux.just(flux1, flux2, flux3, flux4, flux5)
                .doOnComplete(() -> processFluxes(flux1, flux2, flux3, flux4, flux5));
        fluxes.blockLast();
    }
}
4

1 回答 1

1

这是一个如何使用 Mono.zip 的示例:

    public static void main(String[] args) {
        Flux<String> flux0 = Flux.empty();
        Flux<String> flux1 = Flux.just("1.1", "1.2", "1.3");
        Flux<String> flux2 = Flux.just("2.1", "2.2", "2.3");
        Flux<String> flux3 = Flux.just("3.1", "3.2", "3.3");
        
        Mono.zip(lists -> process(lists), flux0.collectList(), flux1.collectList(), flux2.collectList(), flux3.collectList()).block();
    }
    
    private static String process(Object[] lists) {
        System.out.println("List 0 is " + lists[0]);
        System.out.println("List 1 is " + lists[1]);
        System.out.println("List 2 is " + lists[2]);
        System.out.println("List 3 is " + lists[3]);
        return "output";
    }

输出:

List 0 is []
List 1 is [1.1, 1.2, 1.3]
List 2 is [2.1, 2.2, 2.3]
List 3 is [3.1, 3.2, 3.3]

因此,您可以根据自己的情况进行调整。

请注意,Mono.zip 不能返回 null,这就是我将“输出”作为结果的原因,但如果您不需要任何输出,您可以输入任何您想要的不为 null 的内容。

这个想法是首先将每个Flux<String>转换Mono<List<String>>为 using collectList,然后处理起来会更简单。Mono.zip允许您等待所有操作完成,并将输出处理为Object[]. 您可以将每个对象转换为List<String>用于处理的对象。

于 2020-12-13T10:05:39.330 回答