0

我的 API 对两个独立的服务进行了大约 100 次成对的下游调用。所有响应都需要汇总,然后我才能将响应返回给客户。我使用 hystrix-feign 进行 HTTP 调用。

我想出了我认为是一个优雅的解决方案,直到在rxJava 文档上我发现了以下内容

BlockingObservable 是提供阻塞操作符的各种 Observable。它可以用于测试和演示目的,但通常不适合生产应用程序(如果您认为需要使用 BlockingObservable,这通常表明您应该重新考虑您的设计)。

我的代码大致如下

List<Observable<C>> observables = new ArrayList<>();
for (RequestPair request : requests) {
    Observable<C> zipped = Observable.zip(
         feignClientA.sendRequest(request.A()),
         feignClientB.sendRequest(request.B()),
         (a, b) -> new C(a,b));
    observables.add(zipped);
}

Collection<D> apiResponse = = new ConcurrentLinkedQueue<>();

Observable
    .merge(observables)
    .toBlocking()
    .forEach(combinedResponse -> apiResponse.add(doSomeWork(combinedResponse)));

return apiResponse;

基于此设置的几个问题:

  1. 鉴于我的用例,toBlocking() 是否合理
  2. 我是否正确理解在主线程到达 forEach() 之前不会进行实际的 HTTP 调用
  3. 我已经看到 forEach() 块中的代码由不同的线程执行,但我无法验证 forEach() 块中是否可以有多个线程。那里的执行是并发的吗?
4

1 回答 1

1

更好的选择是返回Observable以供其他操作员使用,但您可能会摆脱阻塞代码(但是,它应该在后台线程上运行。)

public Observable<D> getAll(Iterable<RequestPair> requests) {
    return Observable.from(requests)
    .flatMap(request ->
        Observable.zip(
            feignClientA.sendRequest(request.A()),
            feignClientB.sendRequest(request.B()),
            (a, b) -> new C(a,b)
        )
    , 8)  // maximum concurrent HTTP requests
    .map(both -> doSomeWork(both));
}

// for legacy users of the API
public Collection<D> getAllBlocking(Iterable<RequestPair> requests) {
    return getAll(requests)
        .toList()
        .toBlocking()
        .first();
}

我是否正确理解在主线程到达 forEach() 之前不会进行实际的 HTTP 调用

是的,forEach触发了整个操作序列。

我已经看到 forEach() 块中的代码由不同的线程执行,但我无法验证 forEach() 块中是否可以有多个线程。那里的执行是并发的吗?

一次只允许一个线程在其中执行 lambda,forEach但您可能确实会看到不同的线程进入那里。

于 2016-11-23T09:07:25.677 回答