2

我所做的:

我正在使用 vertx rx http 客户端执行大量 HTTP 请求。在这种特定情况下,我调用“方法 A”,它返回一个 ID 列表。要接收我需要多次调用方法 A 以获得下一批结果的所有 ID。(每次我指定一个不同的页码我想接收)

为了提高性能并尽可能并行调用,我创建了一个 (RxJava) Observables 项目列表,每个项目代表单个页面请求的结果。当我完成创建此列表时,我调用 Obserable.zip 运算符并传递 observable 列表。

问题:

使用没有特殊设置的 vertx http 客户端一切正常,但速度非常慢。例如,5 分钟内处理了 3000 个 http 请求。

我尝试通过设置 vertx http 客户端选项来提高性能,如下所示:

 HttpClientOptions options = new HttpClientOptions();

 options.setMaxPoolSize(50)
        .setKeepAlive(true)
        .setPipelining(true)
        .setTcpKeepAlive(true)
        .setPipeliningLimit(25)
        .setMaxWaitQueueSize(10000);

但是当我这样做时,我会得到不稳定的结果:有时一切正常,我能够在不到 20 秒的时间内收到所有响应。但是,有时我都调用的外部服务器会关闭连接,并且日志显示以下错误:

io.vertx.core.http.impl.HttpClientRequestImpl
SEVERE: io.vertx.core.VertxException: Connection was closed
  • 我的代码中没有错误处理程序被调用
  • 当出现此错误时,zip 运算符挂起

这是创建 HttpClientRequest 的代码

public Observable<HttpRestResponse> postWithResponse(String url, Map<String, String> headers, String body) {
        Observable<HttpRestResponse> bufferObservable = Observable.create(subscriber -> {
            try {
                HttpClientRequest request = httpClient.postAbs(url);
                addHeadersToRequest(headers, request);
                sendRequest(url, subscriber, request, body);
            }catch (Exception e) {
                try {
                    subscriber.onError(e);
                }catch (Exception ex) {
                    logger.error("error calling onError for subscriber",ex);
                }finally {
                    subscriber.onCompleted();
                }
            }
        });
        return bufferObservable;
    }

private void sendRequest(String requestUrl, Subscriber<? super HttpRestResponse> subscriber, HttpClientRequest request, String bodyData) {
        final long requestId = reqNumber.getAndIncrement();

        if (bodyData != null) {
            request.putHeader("Content-Length", String.valueOf(bodyData.getBytes().length);
        }

        request.putHeader("Accept-Encoding", "gzip,deflate");

        Observable<HttpRestResponse> retVal = request.toObservable()
                .doOnError(throwable -> {
                    logger.error("<<< #: " + requestId + " HTTP call failed. requestUrl [" + requestUrl + "] reason:" + throwable.getMessage());
                }).doOnNext(response -> {
                    if (response != null) {
                        logger.debug(" <<< #: " + requestId + " " + response.statusCode() + " " + response.statusMessage() + " " + requestUrl);
                    }
                }).flatMap(httpClientResponse -> {
                    try {
                        if (httpClientResponse != null && doCheckResponse(httpClientResponse, requestUrl, requestId, bodyData)) {
                            Observable<Buffer> bufferObservable = httpClientResponse.toObservable()
                                    .reduce(Buffer.buffer(1000), (result, buffer) -> result.appendBuffer(buffer));

                            return bufferObservable.flatMap(buffer -> Observable.just(new HttpRestResponse(buffer, httpClientResponse)));
                        }
                    } catch (Exception e) {
                        logger.error("error in RestHttpClient", e);
                    }

                    return Observable.just(new HttpRestResponse(null, httpClientResponse));
                });

        retVal.subscribe(subscriber);

        if (bodyData != null) {
            request.end(bodyData); // write post data
        } else {
            request.end();
        }
    }

呸呸呸

4

2 回答 2

0

所以最后我想通了。看来我通过 Observable.zip 方法一个空列表...

这里的问题是 onNext 或 onError 没有在“zip”方法的返回的可观察对象上被调用。在这种情况下,只有 onComplete 被调用,我没有费心为......创建一个处理程序

非常感谢所有感兴趣并试图提供帮助的人。

亚尼夫

于 2017-04-22T19:16:19.213 回答
0

如果你认为你可以像这样连接你的异常逻辑

 try {
            HttpClientRequest request = httpClient.postAbs(url);
            addHeadersToRequest(headers, request);
            sendRequest(url, subscriber, request, body);
        }catch (Exception e) {
            try {
                subscriber.onError(e);
            }catch (Exception ex) {
                logger.error("error calling onError for subscriber",ex);
            }finally {
                subscriber.onCompleted();
            }
        }

您不会收到任何错误,因为基本上您的所有处理现在都驻留在 Rx 生态系统中,因此在您的 try catch 块中不会向您报告任何内容。

此时的错误将来自您的

bufferObservable.onErrorReturn()

或者

bufferObservable.subscribe(success, error)
于 2017-04-22T15:10:11.710 回答