我正在尝试使用 Apache Async Http Client 执行带有表单参数的 MultiPart 请求。
下面是我的 HttpClientManager,
public class AsyncHttpClientManager {
private final PoolingNHttpClientConnectionManager connectionManager;
private static final PropertyFileAccessor PROPERTY_FILE_ACCESSOR = PropertyFileAccessor.getInstance();
private static final TaskExecutor TASK_EXECUTOR = TaskExecutor.getInstance();
private static final Logger LOGGER = Logger.getLogger(AsyncHttpClientManager.class);
private static final ConnectionKeepAliveStrategy KEEP_ALIVE_STRATEGY = new ConnectionKeepAliveStrategy() {
@Override
public long getKeepAliveDuration (HttpResponse httpResponse, HttpContext httpContext) {
HeaderElementIterator it = new BasicHeaderElementIterator(httpResponse.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
String value = he.getValue();
if (value != null && "timeout".equalsIgnoreCase(param)) {
try {
return Long.parseLong(value) * 1000;
} catch (NumberFormatException ignore) {
}
}
}
return TimeUnit.SECONDS.toMillis(PROPERTY_FILE_ACCESSOR.getIntegerValue("http.connection.keep.alive.seconds", String.valueOf(30)));
}
};
private static final AsyncHttpClientManager INSTANCE = new AsyncHttpClientManager();
private ConnectingIOReactor httpIOReactor;
private AsyncHttpClientManager () {
try {
int timeOut = (int) TimeUnit.SECONDS.toMillis(PROPERTY_FILE_ACCESSOR.getIntegerValue("http.connection.keep.alive.seconds", String.valueOf(30)));
IOReactorConfig reactorConfig = IOReactorConfig.custom()
.setTcpNoDelay(true)
.setIoThreadCount(200)
.setSoTimeout(timeOut)
.setSoReuseAddress(true)
.setConnectTimeout(timeOut)
.build();
httpIOReactor = new DefaultConnectingIOReactor(reactorConfig);
((DefaultConnectingIOReactor) httpIOReactor).setExceptionHandler(new IOReactorExceptionHandler() {
@Override
public boolean handle (IOException e) {
logException(e);
return false;
}
@Override
public boolean handle (RuntimeException e) {
logException(e);
return false;
}
private void logException (Exception e) {
LOGGER.error("Ion", e);
}
});
HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor();
final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, ConnectionConfig.DEFAULT);
TASK_EXECUTOR.executeTask(() -> {
try {
httpIOReactor.execute(ioEventDispatch);
} catch (IOException e) {
LOGGER.error("Unable To Instantiate Async HTTP Client", e);
throw new RuntimeException(e);
}
});
connectionManager = new PoolingNHttpClientConnectionManager(httpIOReactor);
connectionManager.setMaxTotal(PROPERTY_FILE_ACCESSOR.getIntegerValue("http.connection.max.count", String.valueOf(10)));
connectionManager.setDefaultMaxPerRoute(PROPERTY_FILE_ACCESSOR.getIntegerValue("http.connection.default.per.route", String.valueOf(5)));
} catch (IOException e) {
LOGGER.error("Unable To Instantiate Async HTTP Client", e);
throw new RuntimeException(e);
}
}
public CloseableHttpAsyncClient getClient () {
return HttpAsyncClients
.custom()
.setConnectionManager(connectionManager)
.setKeepAliveStrategy(KEEP_ALIVE_STRATEGY)
.setConnectionReuseStrategy(new DefaultConnectionReuseStrategy())
.setConnectionManagerShared(true)
.build();
}
public synchronized void shutDown () throws IOException {
if (httpIOReactor != null) {
httpIOReactor.shutdown();
}
if (connectionManager != null) {
connectionManager.shutdown();
}
}
public static AsyncHttpClientManager getInstance () {
return INSTANCE;
}
}
下面的部分是我如何尝试调用我的 api,
File newsLetter = null;
try {
CloseableHttpAsyncClient closeableHttpClient = ASYNC_HTTP_CLIENT_MANAGER.getClient();
closeableHttpClient.start();
newsLetter = new File(inputObject.getFileLocation());
MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create();
...
multipartEntityBuilder.addBinaryBody("newsletter_file", newsLetter, ContentType.DEFAULT_BINARY, newsLetter.getName());
ZeroCopyPost zeroCopyPost = new ZeroCopyPost(url, newsLetter, ContentType.DEFAULT_BINARY) {
@Override
protected HttpEntityEnclosingRequest createRequest (URI requestURI, HttpEntity entity) {
HttpEntityEnclosingRequest request = super.createRequest(requestURI, entity);
request.setEntity(multipartEntityBuilder.build());
return request;
}
};
final File finalNewsLetter = newsLetter;
closeableHttpClient.execute(zeroCopyPost, null, new FutureCallback <HttpResponse>() {
@Override
public void completed (HttpResponse httpResponse) {
int returnCode = httpResponse.getStatusLine().getStatusCode();
try {
handleResponse(inputObject, finalNewsLetter, httpResponse, returnCode);
} catch (ProcessingException | IOException e) {
failed(e);
}
}
@Override
public void failed (Exception e) {
handleError(inputObject, finalNewsLetter, e.getMessage());
}
@Override
public void cancelled () {
handleError(inputObject, finalNewsLetter, "Request Cancelled");
}
});
} catch (Exception e) {
handleError(inputObject, newsLetter, "FAILED to send email" + e.getMessage());
}
我面临的问题是每当我尝试连接到我得到SocketTimeoutException
的 api 时,虽然 api 能够响应我的邮递员请求。如果我遗漏了什么,请告诉我。