我所做的:
我正在使用 vertx rx http 客户端执行大量 HTTP 请求。在这种特定情况下,我调用“方法 A”,它返回一个 ID 列表。要接收我需要多次调用方法 A 以获得下一批结果的所有 ID。(每次我指定一个不同的页码我想接收)
为了提高性能并尽可能并行调用,我创建了一个 (RxJava) Observables 项目列表,每个项目代表单个页面请求的结果。当我完成创建此列表时,我调用 Obserable.zip 运算符并传递 observable 列表。
问题:
使用没有特殊设置的 vertx http 客户端一切正常,但速度非常慢。例如,5 分钟内处理了 3000 个 http 请求。
我尝试通过设置 vertx http 客户端选项来提高性能,如下所示:
HttpClientOptions options = new HttpClientOptions();
options.setMaxPoolSize(50)
.setKeepAlive(true)
.setPipelining(true)
.setTcpKeepAlive(true)
.setPipeliningLimit(25)
.setMaxWaitQueueSize(10000);
但是当我这样做时,我会得到不稳定的结果:有时一切正常,我能够在不到 20 秒的时间内收到所有响应。但是,有时我都调用的外部服务器会关闭连接,并且日志显示以下错误:
io.vertx.core.http.impl.HttpClientRequestImpl
SEVERE: io.vertx.core.VertxException: Connection was closed
- 我的代码中没有错误处理程序被调用
- 当出现此错误时,zip 运算符挂起
这是创建 HttpClientRequest 的代码
public Observable<HttpRestResponse> postWithResponse(String url, Map<String, String> headers, String body) {
Observable<HttpRestResponse> bufferObservable = Observable.create(subscriber -> {
try {
HttpClientRequest request = httpClient.postAbs(url);
addHeadersToRequest(headers, request);
sendRequest(url, subscriber, request, body);
}catch (Exception e) {
try {
subscriber.onError(e);
}catch (Exception ex) {
logger.error("error calling onError for subscriber",ex);
}finally {
subscriber.onCompleted();
}
}
});
return bufferObservable;
}
private void sendRequest(String requestUrl, Subscriber<? super HttpRestResponse> subscriber, HttpClientRequest request, String bodyData) {
final long requestId = reqNumber.getAndIncrement();
if (bodyData != null) {
request.putHeader("Content-Length", String.valueOf(bodyData.getBytes().length);
}
request.putHeader("Accept-Encoding", "gzip,deflate");
Observable<HttpRestResponse> retVal = request.toObservable()
.doOnError(throwable -> {
logger.error("<<< #: " + requestId + " HTTP call failed. requestUrl [" + requestUrl + "] reason:" + throwable.getMessage());
}).doOnNext(response -> {
if (response != null) {
logger.debug(" <<< #: " + requestId + " " + response.statusCode() + " " + response.statusMessage() + " " + requestUrl);
}
}).flatMap(httpClientResponse -> {
try {
if (httpClientResponse != null && doCheckResponse(httpClientResponse, requestUrl, requestId, bodyData)) {
Observable<Buffer> bufferObservable = httpClientResponse.toObservable()
.reduce(Buffer.buffer(1000), (result, buffer) -> result.appendBuffer(buffer));
return bufferObservable.flatMap(buffer -> Observable.just(new HttpRestResponse(buffer, httpClientResponse)));
}
} catch (Exception e) {
logger.error("error in RestHttpClient", e);
}
return Observable.just(new HttpRestResponse(null, httpClientResponse));
});
retVal.subscribe(subscriber);
if (bodyData != null) {
request.end(bodyData); // write post data
} else {
request.end();
}
}
呸呸呸