0

我有一个接受实体 ID 和“分辨率类型”作为参数的模块,然后(主要)通过返回 Fluxes 的多个操作异步收集数据。解析分为多个(主要是再次)异步操作,每个异步操作都用于收集有助于解析的不同数据类型。我说“主要”是异步的,因为某些解析类型需要一些必须同步进行的初步操作,以便为解析的其余异步 Flux 操作提供信息。现在,当这个同步操作发生时,整个异步解析操作的至少一部分可以开始。我想在同步操作发生时启动这些 Flux 操作。然后,一旦同步数据得到解决,我就可以为正在进行的剩余操作获取每个 Flux。某些分辨率类型将使所有 Flux 操作都返回数据,而另一些则收集较少的信息,并且某些 Flux 操作将保持为空。解析操作非常耗时,我希望能够更早地开始一些 Flux 操作,以便我可以稍微压缩时间——这对于我正在完成的工作非常重要。所以急于订阅是理想的,只要我能保证不会错过任何一个项目发射。而且我希望能够更早地开始一些 Flux 操作,以便我可以稍微压缩时间——这对我正在完成的工作非常重要。所以急于订阅是理想的,只要我能保证不会错过任何一个项目发射。而且我希望能够更早地开始一些 Flux 操作,以便我可以稍微压缩时间——这对我正在完成的工作非常重要。所以急于订阅是理想的,只要我能保证不会错过任何一个项目发射。

考虑到这一点,我该如何:

  1. 为解决所有问题所需的每个 Flux 操作创建一个“持有者”或“容器”,并将它们初始化为空(如Flux.empty()
  2. 将项目添加到我可以在上面的项目 1 中创建的任何内容中——它被初始化为空,但我可能想要来自一个或多个有限和异步 Flux 操作的数据,但我不关心将它们分开,它们可以显示为一个流,当我将使用collectList()它们来生成Mono.
  3. 当其中一些Flux操作应该在其他一些操作之前开始时,我该如何启动它们,并确保我不会错过任何数据?例如,如果我开始一个名称解析 Flux,我可以像上面第 2 项那样添加它吗?假设我想开始检索一些数据,然后执行一个同步操作,然后我从同步操作的结果中创建另一个名称解析 Flux,我能否将这个新的 Flux 附加到原始名称解析 Flux 中,因为它将是返回相同的数据类型?我知道Flux.merge(),但如果可能的话,使用一个我可以继续添加的 Flux 引用会很方便。

我需要一个集合对象,比如一个列表,然后使用合并操作吗?最初,我考虑使用ConnectableFlux, 直到我意识到它是用于连接多个订阅者,而不是用于连接多个发布者。我认为连接多个发布者是我需要的一个很好的答案,除非这是一种可以以更好方式处理的常见模式。

我做响应式编程的时间很短,所以请耐心等待我试图描述我想做的事情的方式。如果我能更好地澄清我的意图,请让我知道我不清楚的地方,我很乐意尝试澄清它。提前感谢您的时间和帮助!

编辑:这是最终的 Kotlin 版本,简洁明了:

private val log = KotlinLogging.logger {}

class ReactiveDataService {
    private val createMono: () -> Mono<List<Int>> = {
        Flux.just(9, 8, 7)
            .flatMap {
                Flux.fromIterable(List(it) { Random.nextInt(0, 100) })
                    .parallel()
                    .runOn(Schedulers.boundedElastic())
            }
            .collectList()
            .cache()
    }

    private val processResults: (List<String>, List<String>) -> String =
        { d1, d2 -> "\n\tdownstream 1: $d1\n\tdownstream 2: $d2" }

    private val convert: (List<Int>, Int) -> Flux<String> =
        { data, multiplier -> Flux.fromIterable(data.map { String.format("%3d", it * multiplier) }) }

    fun doQuery(): String? {
        val mono = createMono()
        val downstream1 = mono.flatMapMany { convert(it, 1) }.collectList()
        val downstream2 = mono.flatMapMany { convert(it, 2) }.collectList()
        return Mono.zip(downstream1, downstream2, processResults).block()
    }
}

fun main() {
    val service = ReactiveDataService()
    val start = System.currentTimeMillis()
    val result = service.doQuery()
    log.info("{}\n\tTotal time: {}ms", result, System.currentTimeMillis() - start)
}

和输出:

downstream 1: [ 66,  39,  40,  88,  97,  35,  70,  91,  27,  12,  84,  37,  35,  15,  45,  27,  85,  22,  55,  89,  81,  21,  43,  62]
downstream 2: [132,  78,  80, 176, 194,  70, 140, 182,  54,  24, 168,  74,  70,  30,  90,  54, 170,  44, 110, 178, 162,  42,  86, 124]
Total time: 209ms
4

1 回答 1

1

这听起来像是反应堆的理想工作。可以使用弹性调度器将同步调用包装为 Flux(或 Monos)返回,以允许它们并行执行。然后使用各种运算符,您可以将它们组合在一起以生成表示结果的单个 Flux。订阅那个 Flux,整个机器就会启动。

我认为您需要使用 Mono.flatMapMany 而不是 Flux.usingWhen。

public class ReactiveDataService {
  public static void main(final String[] args) {
    ReactiveDataService service = new ReactiveDataService();
    service.doQuery();
  }

  private Flux<Integer> process1(final List<Integer> data) {
    return Flux.fromIterable(data);
  }

  private Flux<Integer> process2(final List<Integer> data) {
    return Flux.fromIterable(data).map(i -> i * 2);
  }

  private String process3(List<Integer> downstream1, List<Integer> downstream2) {
    System.out.println("downstream 1: " + downstream1);
    System.out.println("downstream 2: " + downstream2);
    return "Done";
  }

  private void doQuery() {
    final Mono<List<Integer>> mono =
        Flux.just(9, 8, 7)
            .flatMap(
                limit ->
                    Flux.fromStream(
                            Stream.generate(() -> new Random().nextInt(100))
                                .peek(
                                    i -> {
                                      try {
                                        Thread.sleep(500);
                                      } catch (InterruptedException ignored) {
                                      }
                                    })
                                .limit(limit))
                        .parallel()
                        .runOn(Schedulers.boundedElastic()))
            .collectList()
            .cache();
    final Mono<List<Integer>> downstream1 = mono.flatMapMany(this::process1).collectList();
    final Mono<List<Integer>> downstream2 = mono.flatMapMany(this::process2).collectList();
    Mono.zip(downstream1, downstream2, this::process3).block();
  }
}

于 2020-12-31T12:49:48.137 回答