0

我是 Spring Reactive 编程的新手,我正在开发一个返回 Flux 的 REST 端点。例如:

 @PostMapping
 public Flux<MyResponse> processRequests(@RequestBody List<MyRequest> requests) {

        return Flux.merge(Arrays.asList(dataSource.processRequest(requests.get(0)), dataSource2.processRequest(requests.get(0)))).parallel()
                    .runOn(Schedulers.elastic()).sequential();
}

示例代码中的每个数据源(dataSource 和 dataSource2)都实现了一个如下所示的接口:

public interface MyResponseAdapter {
    Flux<MyResponse> processRequest(MyRequest request);
}

此代码工作正常,因为它按预期返回 Flux,但正如您所见,该代码仅引用 MyRequest 列表中的第一个元素。我需要做的是为 MyRequest 列表中的每个元素构造 Flux.merge。谁能指出我正确的方向?

4

1 回答 1

0

我想我已经确定了一个简单的解决方案:

        List<Flux<MyResponse>> results = new ArrayList<>();
        for (MyRequest myRequest : requests ) {
            results.add(dataSource.processRequest(myRequest));
            results.add(dataSource2.processRequest(myRequest));
        }

        return Flux.merge(results).parallel().runOn(Schedulers.elastic()).sequential();

于 2020-04-22T17:48:49.470 回答