0

我们正在使用 project-reactor 从外部 web 服务中检索一些数据并生成一堆结果对象。

首先,我们需要获取一些触发下一次 Web 服务调用所需的主数据。在主数据可用后,我们会根据主数据的结果检索更多数据。接下来我们必须等待所有 Monos 发出它的结果。然后我们处理所有数据并构建我们的结果对象。

我们在反应流方面没有太多经验。我们的嵌套订阅解决方案有效,但我们相信可能有更好的方法来归档我们想要做的事情。

问题 1

Masterdata_A 和 Masterdata_B 可以并行获取,但是如何在不嵌套的情况下以反应方式表达呢?getFluxMasterdata_B 的每个结果都应该与 getMonoMasterdata_A 的一个结果相结合。

问题2

具有两个 Masterdata 的 Tupel 应该以某种方式受到限制,以免 Web 服务因许多数据请求而不堪重负。1 秒的实际延迟只是一个似乎可行的猜测,但最好定义第一个内部 flatMap 的最大并行执行数,以便一次最多有 N 个等待的 web 服务调用。

问题 3

将来我们可能必须从 Web 服务中获取更多数据来构建 ProcessingResult。是否有定义反应流以使其可读/可理解的最佳实践?反应流的嵌套可以还是应该避免(将所有内容保持在顶层)?


领域模型

    private static class Masterdata_A
    {
        private List<MasterdataRecord_A> records;
    }

    private static class MasterdataRecord_A { /* ... business relevant fields */ }
    private static class MasterdataRecord_B { /* ... business relevant fields */ }
    private static class Data_A { /* ... business relevant fields */ }
    private static class Data_B { /* ... business relevant fields */ }
    private static class Data_C { /* ... business relevant fields */ }

    private static class ProcessingResult { /* ... business relevant fields */ }

WebserviceImpl

    private static class Webservice
    {
        private Mono<Masterdata_A> getMonoMasterdata_A() { /* fetch data from external webservice */ }
        private Flux<MasterdataRecord_B> getFluxMasterdata_B() { /* fetch data from external webservice */ }

        private Mono<Data_A> getMonoData_A() { /* fetch data from external webservice */ }
        private Mono<Data_B> getMonoData_B() { /* fetch data from external webservice */ }
        private Mono<Data_C> getMonoData_C() { /* fetch data from external webservice */ }
    }

业务服务实现

    public class BusinessService
    {
        public void processData(...params...)
        {
            Webservice webservie = getWebservice();
            // As soon as Mono<Masterdata_A> emits its result AND Flux<Masterdata_B> emits its first result than the first inner flatMap should be executed
            // to fetch some extra data from the service based on the actual masterdata.
            // For building the ProcessingResult we need access to all data available in the actual context.
            webservice.getMonoMasterdata_A()
                    .subscribe((Masterdata_A masterdataA) -> {
                        webservice.getFluxMasterdata_B()
                                .delayElements(Duration.ofSeconds(1))
                                .flatMap((MasterdataRecord_B masterdataB) -> {
                                    Mono<Data_A> monoA = webservice.getMonoData_A(masterdataA);
                                    Mono<Data_B> monoB = webservice.getMonoData_B(masterdataB);
                                    Mono<Data_C> monoC = webservice.getMonoData_C(masterdataA, masterdataB);
                                    // wait for result of all Monos
                                    return Mono.zip(monoA, monoB, monoC);
                                })
                                .flatMap((Tuple3<Data_A, Data_B, Data_C> data) -> {
                                    Data_A dataA = data.getT1();
                                    Data_B dataB = data.getT2();
                                    Data_C dataC = data.getT3();

                                    // create result from masterdataA, masterdataB, dataA, dataB, dataC
                                    ProcessingResult result = ...;
                                    return Mono.just(result);
                                })
                                .subscribe(processingResult -> {
                                    // store result to db/filesystem
                                });
                    });
        }
    }
4

1 回答 1

1

问题 1

Mono<Masterdata_A> monoMasterdata_a = webservice.getMonoMasterdata_A();
Flux<MasterdataRecord_B> masterdataRecordBFlux = webservice.getFluxMasterdata_B();

    // suppose that  getMonoMasterdata_A return just "A" and getFluxMasterdata_B reutrn [1,2,3,,,]
    // then the result will be [(A,1), (A,2), (A,3),,,]
    // masterdataAFlux and masterdataRecordBFlux will execute in parallel
Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)

问题2

    Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)
         // yes that will work just fine for not overwhelming the web services
          // 500 is random value you need to test and tune the optimal value for these services
          .delayElements(Duration.ofMillis(500))
          .flatMap((Tuple2<Masterdata_A, MasterdataRecord_B> tuple2) -> {
                    Mono<Data_A> monoA = webservice.getMonoData_A();
                    Mono<Data_B> monoB = webservice.getMonoData_B();
                    Mono<Data_C> monoC = webservice.getMonoData_C();
                    // wait for result of all Monos
                    return Mono.zip(monoA, monoB, monoC);
                  },
                  // flatmap can take the num of concurrent actions
                  // 5 is random value also u need to test and check the best value for that
                  5)

问题 3

看看这个 https://github.com/reactor/reactive-streams-commons/issues/21

完整的例子

 Mono<Masterdata_A> monoMasterdata_a = webservice.getMonoMasterdata_A();
Flux<MasterdataRecord_B> masterdataRecordBFlux = webservice.getFluxMasterdata_B();

    // suppose that  getMonoMasterdata_A return just "A" and getFluxMasterdata_B reutrn [1,2,3,,,]
    // then the result will be [(A,1), (A,2), (A,3),,,]
    // masterdataAFlux and masterdataRecordBFlux will execute in parallel
Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)
         // yes that will work just fine for not overwhelming the web services
          // 500 is random value you need to test and tune the optimal value for these services
          .delayElements(Duration.ofMillis(500))
          .flatMap((Tuple2<Masterdata_A, MasterdataRecord_B> tuple2) -> {
                    Mono<Data_A> monoA = webservice.getMonoData_A();
                    Mono<Data_B> monoB = webservice.getMonoData_B();
                    Mono<Data_C> monoC = webservice.getMonoData_C();
                    // wait for result of all Monos
                    return Mono.zip(monoA, monoB, monoC);
                  },
                  // flatmap can take the num of concurrent actions
                  // 5 is random value also u need to test and check the best value for that
                  5)
          .map(data -> {
            // for the mapping u don't need flatmap because it's an expensive operation
            // map is the right choice
            Data_A dataA = data.getT1();
            Data_B dataB = data.getT2();
            Data_C dataC = data.getT3();

            // create result from masterdataA, masterdataB, dataA, dataB, dataC
            ProcessingResult result = ???;
            return result;
          })
          // it's always better to save in batch
            // 100 is a random value u should put a value that most suitable for your datasource
          .bufferTimeout(100, Duration.ofMillis(100))
          .concatMap(processingResults -> {
            return batchSave(processingResults)
                    // because batchSave is blocking op
                    .subscribeOn(Schedulers.boundedElastic());
          })
          .subscribe();
于 2020-04-25T22:44:37.447 回答