0

我正在尝试使用 Apache Async Http Client 执行带有表单参数的 MultiPart 请求。

下面是我的 HttpClientManager,

public class AsyncHttpClientManager {

    private final        PoolingNHttpClientConnectionManager connectionManager;
    private static final PropertyFileAccessor                PROPERTY_FILE_ACCESSOR = PropertyFileAccessor.getInstance();
    private static final TaskExecutor                        TASK_EXECUTOR          = TaskExecutor.getInstance();
    private static final Logger                              LOGGER                 = Logger.getLogger(AsyncHttpClientManager.class);
    private static final ConnectionKeepAliveStrategy         KEEP_ALIVE_STRATEGY    = new ConnectionKeepAliveStrategy() {
        @Override
        public long getKeepAliveDuration (HttpResponse httpResponse, HttpContext httpContext) {
            HeaderElementIterator it = new BasicHeaderElementIterator(httpResponse.headerIterator(HTTP.CONN_KEEP_ALIVE));
            while (it.hasNext()) {
                HeaderElement he    = it.nextElement();
                String        param = he.getName();
                String        value = he.getValue();
                if (value != null && "timeout".equalsIgnoreCase(param)) {
                    try {
                        return Long.parseLong(value) * 1000;
                    } catch (NumberFormatException ignore) {
                    }
                }
            }
            return TimeUnit.SECONDS.toMillis(PROPERTY_FILE_ACCESSOR.getIntegerValue("http.connection.keep.alive.seconds", String.valueOf(30)));
        }
    };
    private static final AsyncHttpClientManager              INSTANCE               = new AsyncHttpClientManager();
    private              ConnectingIOReactor                 httpIOReactor;


    private AsyncHttpClientManager () {
        try {
            int timeOut = (int) TimeUnit.SECONDS.toMillis(PROPERTY_FILE_ACCESSOR.getIntegerValue("http.connection.keep.alive.seconds", String.valueOf(30)));
            IOReactorConfig reactorConfig = IOReactorConfig.custom()
                                                           .setTcpNoDelay(true)
                                                           .setIoThreadCount(200)
                                                           .setSoTimeout(timeOut)
                                                           .setSoReuseAddress(true)
                                                           .setConnectTimeout(timeOut)
                                                           .build();
            httpIOReactor = new DefaultConnectingIOReactor(reactorConfig);
            ((DefaultConnectingIOReactor) httpIOReactor).setExceptionHandler(new IOReactorExceptionHandler() {
                @Override
                public boolean handle (IOException e) {
                    logException(e);
                    return false;
                }


                @Override
                public boolean handle (RuntimeException e) {
                    logException(e);
                    return false;
                }


                private void logException (Exception e) {
                    LOGGER.error("Ion", e);
                }
            });
            HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor();
            final IOEventDispatch    ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, ConnectionConfig.DEFAULT);
            TASK_EXECUTOR.executeTask(() -> {
                try {
                    httpIOReactor.execute(ioEventDispatch);
                } catch (IOException e) {
                    LOGGER.error("Unable To Instantiate Async HTTP Client", e);
                    throw new RuntimeException(e);
                }
            });
            connectionManager = new PoolingNHttpClientConnectionManager(httpIOReactor);
            connectionManager.setMaxTotal(PROPERTY_FILE_ACCESSOR.getIntegerValue("http.connection.max.count", String.valueOf(10)));
            connectionManager.setDefaultMaxPerRoute(PROPERTY_FILE_ACCESSOR.getIntegerValue("http.connection.default.per.route", String.valueOf(5)));
        } catch (IOException e) {
            LOGGER.error("Unable To Instantiate Async HTTP Client", e);
            throw new RuntimeException(e);
        }
    }


    public CloseableHttpAsyncClient getClient () {
        return HttpAsyncClients
                .custom()
                .setConnectionManager(connectionManager)
                .setKeepAliveStrategy(KEEP_ALIVE_STRATEGY)
                .setConnectionReuseStrategy(new DefaultConnectionReuseStrategy())
                .setConnectionManagerShared(true)
                .build();
    }


    public synchronized void shutDown () throws IOException {
        if (httpIOReactor != null) {
            httpIOReactor.shutdown();
        }
        if (connectionManager != null) {
            connectionManager.shutdown();
        }
    }


    public static AsyncHttpClientManager getInstance () {
        return INSTANCE;
    }
}

下面的部分是我如何尝试调用我的 api,

File newsLetter = null;
        try {
            CloseableHttpAsyncClient closeableHttpClient = ASYNC_HTTP_CLIENT_MANAGER.getClient();
            closeableHttpClient.start();
            newsLetter = new File(inputObject.getFileLocation());
            MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create();
            ...
            multipartEntityBuilder.addBinaryBody("newsletter_file", newsLetter, ContentType.DEFAULT_BINARY, newsLetter.getName());

            ZeroCopyPost zeroCopyPost = new ZeroCopyPost(url, newsLetter, ContentType.DEFAULT_BINARY) {
                @Override
                protected HttpEntityEnclosingRequest createRequest (URI requestURI, HttpEntity entity) {
                    HttpEntityEnclosingRequest request = super.createRequest(requestURI, entity);
                    request.setEntity(multipartEntityBuilder.build());
                    return request;
                }
            };


            final File finalNewsLetter = newsLetter;

            closeableHttpClient.execute(zeroCopyPost, null, new FutureCallback <HttpResponse>() {
                @Override
                public void completed (HttpResponse httpResponse) {

                    int returnCode = httpResponse.getStatusLine().getStatusCode();
                    try {
                        handleResponse(inputObject, finalNewsLetter, httpResponse, returnCode);
                    } catch (ProcessingException | IOException e) {
                        failed(e);
                    }
                }


                @Override
                public void failed (Exception e) {
                    handleError(inputObject, finalNewsLetter, e.getMessage());
                }


                @Override
                public void cancelled () {
                    handleError(inputObject, finalNewsLetter, "Request Cancelled");
                }
            });

        } catch (Exception e) {
            handleError(inputObject, newsLetter, "FAILED to send email" + e.getMessage());
        }

我面临的问题是每当我尝试连接到我得到SocketTimeoutException的 api 时,虽然 api 能够响应我的邮递员请求。如果我遗漏了什么,请告诉我。

4

0 回答 0