我的 API 对两个独立的服务进行了大约 100 次成对的下游调用。所有响应都需要汇总,然后我才能将响应返回给客户。我使用 hystrix-feign 进行 HTTP 调用。
我想出了我认为是一个优雅的解决方案,直到在rxJava 文档上我发现了以下内容
BlockingObservable 是提供阻塞操作符的各种 Observable。它可以用于测试和演示目的,但通常不适合生产应用程序(如果您认为需要使用 BlockingObservable,这通常表明您应该重新考虑您的设计)。
我的代码大致如下
List<Observable<C>> observables = new ArrayList<>();
for (RequestPair request : requests) {
Observable<C> zipped = Observable.zip(
feignClientA.sendRequest(request.A()),
feignClientB.sendRequest(request.B()),
(a, b) -> new C(a,b));
observables.add(zipped);
}
Collection<D> apiResponse = = new ConcurrentLinkedQueue<>();
Observable
.merge(observables)
.toBlocking()
.forEach(combinedResponse -> apiResponse.add(doSomeWork(combinedResponse)));
return apiResponse;
基于此设置的几个问题:
- 鉴于我的用例,toBlocking() 是否合理
- 我是否正确理解在主线程到达 forEach() 之前不会进行实际的 HTTP 调用
- 我已经看到 forEach() 块中的代码由不同的线程执行,但我无法验证 forEach() 块中是否可以有多个线程。那里的执行是并发的吗?