2

我正在构建一个同时使用AWS Elasticsearch 服务AWS DocumentDB(与 mongoDB 兼容)的应用程序。

为了连接到 Elasticsearch 服务,我复制了 awsdocs 提供的示例项目:https ://github.com/awsdocs/amazon-elasticsearch-service-developer-guide/blob/master/sample_code/java/amazon-es- docs-sample-client.zip

示例项目包含两个文件AWSRequestSigningApacheInterceptor.java,它充当普通 ElasticSearch RestClient 的代理,以及一个名为AmazonElasticsearchServiceClient.java的示例测试。这些张贴在底部。我已经尝试过了,它工作正常。

现在解决问题:

为了通过 SSL 连接到 DocumentDB,我需要使用 System.setProperties(),这似乎会破坏 Elasticsearch 服务连接。类似于 AWS 描述的以编程方式连接到 DocumentDB 集群的过程,我使用 SSL 上下文帮助程序来设置密钥库并将连接到 DocumentDB 所需的 pem 文件放入其中。运行文件SSLContextHelper.java会中断 ElasticSearch 连接,从而产生以下错误:

javax.net.ssl.SSLHandshakeException:一般 SSLEngine 问题原因:sun.security.validator.ValidatorException:PKIX 路径构建失败:sun.security.provider.certpath.SunCertPathBuilderException:无法找到请求目标的有效证书路径原因:sun .security.provider.certpath.SunCertPathBuilderException:无法找到请求目标的有效证书路径

我认为这个问题可能是因为 AWSRequestSigningApacheInterceptor.java 利用了与信任库相关的系统属性。虽然我不知道如何解决它。有什么建议么?

AWSRequestSigningApacheInterceptor.java

/*
 * Copyright 2012-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
 * the License. A copy of the License is located at
 *
 * http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 */
package com.amazonaws.http;

import com.amazonaws.DefaultRequest;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.Signer;
import org.apache.http.Header;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST;

/**
 * An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer}
 * and {@link AWSCredentialsProvider}.
 */
public class AWSRequestSigningApacheInterceptor implements HttpRequestInterceptor {
    /**
     * The service that we're connecting to. Technically not necessary.
     * Could be used by a future Signer, though.
     */
    private final String service;

    /**
     * The particular signer implementation.
     */
    private final Signer signer;

    /**
     * The source of AWS credentials for signing.
     */
    private final AWSCredentialsProvider awsCredentialsProvider;

    /**
     *
     * @param service service that we're connecting to
     * @param signer particular signer implementation
     * @param awsCredentialsProvider source of AWS credentials for signing
     */
    public AWSRequestSigningApacheInterceptor(final String service,
                                final Signer signer,
                                final AWSCredentialsProvider awsCredentialsProvider) {
        this.service = service;
        this.signer = signer;
        this.awsCredentialsProvider = awsCredentialsProvider;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void process(final HttpRequest request, final HttpContext context)
            throws HttpException, IOException {
        URIBuilder uriBuilder;
        try {
            uriBuilder = new URIBuilder(request.getRequestLine().getUri());
        } catch (URISyntaxException e) {
            throw new IOException("Invalid URI" , e);
        }

        // Copy Apache HttpRequest to AWS DefaultRequest
        DefaultRequest<?> signableRequest = new DefaultRequest<>(service);

        HttpHost host = (HttpHost) context.getAttribute(HTTP_TARGET_HOST);
        if (host != null) {
            signableRequest.setEndpoint(URI.create(host.toURI()));
        }
        final HttpMethodName httpMethod =
                HttpMethodName.fromValue(request.getRequestLine().getMethod());
        signableRequest.setHttpMethod(httpMethod);
        try {
            signableRequest.setResourcePath(uriBuilder.build().getRawPath());
        } catch (URISyntaxException e) {
            throw new IOException("Invalid URI" , e);
        }

        if (request instanceof HttpEntityEnclosingRequest) {
            HttpEntityEnclosingRequest httpEntityEnclosingRequest =
                    (HttpEntityEnclosingRequest) request;
            if (httpEntityEnclosingRequest.getEntity() != null) {
                signableRequest.setContent(httpEntityEnclosingRequest.getEntity().getContent());
            }
        }
        signableRequest.setParameters(nvpToMapParams(uriBuilder.getQueryParams()));
        signableRequest.setHeaders(headerArrayToMap(request.getAllHeaders()));

        // Sign it
        signer.sign(signableRequest, awsCredentialsProvider.getCredentials());

        // Now copy everything back
        request.setHeaders(mapToHeaderArray(signableRequest.getHeaders()));
        if (request instanceof HttpEntityEnclosingRequest) {
            HttpEntityEnclosingRequest httpEntityEnclosingRequest =
                    (HttpEntityEnclosingRequest) request;
            if (httpEntityEnclosingRequest.getEntity() != null) {
                BasicHttpEntity basicHttpEntity = new BasicHttpEntity();
                basicHttpEntity.setContent(signableRequest.getContent());
                httpEntityEnclosingRequest.setEntity(basicHttpEntity);
            }
        }
    }

    /**
     *
     * @param params list of HTTP query params as NameValuePairs
     * @return a multimap of HTTP query params
     */
    private static Map<String, List<String>> nvpToMapParams(final List<NameValuePair> params) {
        Map<String, List<String>> parameterMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
        for (NameValuePair nvp : params) {
            List<String> argsList =
                    parameterMap.computeIfAbsent(nvp.getName(), k -> new ArrayList<>());
            argsList.add(nvp.getValue());
        }
        return parameterMap;
    }

    /**
     * @param headers modeled Header objects
     * @return a Map of header entries
     */
    private static Map<String, String> headerArrayToMap(final Header[] headers) {
        Map<String, String> headersMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
        for (Header header : headers) {
            if (!skipHeader(header)) {
                headersMap.put(header.getName(), header.getValue());
            }
        }
        return headersMap;
    }

    /**
     * @param header header line to check
     * @return true if the given header should be excluded when signing
     */
    private static boolean skipHeader(final Header header) {
        return ("content-length".equalsIgnoreCase(header.getName())
                && "0".equals(header.getValue())) // Strip Content-Length: 0
                || "host".equalsIgnoreCase(header.getName()); // Host comes from endpoint
    }

    /**
     * @param mapHeaders Map of header entries
     * @return modeled Header objects
     */
    private static Header[] mapToHeaderArray(final Map<String, String> mapHeaders) {
        Header[] headers = new Header[mapHeaders.size()];
        int i = 0;
        for (Map.Entry<String, String> headerEntry : mapHeaders.entrySet()) {
            headers[i++] = new BasicHeader(headerEntry.getKey(), headerEntry.getValue());
        }
        return headers;
    }
}

AmazonElasticsearchServiceClient.java

package com.amazonaws.samples;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import com.amazonaws.auth.AWS4Signer;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.http.AWSRequestSigningApacheInterceptor;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class AmazonElasticsearchServiceClient {

    private static final String serviceName = "es";
    private static final String region = "us-west-2";
    private static final String aesEndpoint = ""; // e.g. https://search-mydomain.us-west-1.es.amazonaws.com
    private static final String type = "_doc";

    static final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();

    public static void main(String[] args) throws IOException {

        RestHighLevelClient aesClient = aesClient();

        String index = "java-client-test-index";

        // Create a document that simulates a simple log line from a web server
        Map<String, Object> document = new HashMap<>();
        document.put("method", "GET");
        document.put("client_ip_address", "123.456.78.90");
        document.put("timestamp", "10/Oct/2000:14:56:14 -0700");

        System.out.println("Demoing a single index request:");
        String id = "1";
        IndexRequest indexRequest = new IndexRequest(index, type, id).source(document);
        IndexResponse indexResponse = aesClient.index(indexRequest, RequestOptions.DEFAULT);
        System.out.println(indexResponse.toString());

        System.out.println("Demoing a 1 MB bulk request:");
        BulkRequest bulkRequest = new BulkRequest();

        // Add documents (the simple log line from earlier) to the request until it exceeds 1 MB
        while (bulkRequest.estimatedSizeInBytes() < 1000000) {
            // By not specifying an ID, these documents get auto-assigned IDs
            bulkRequest.add(new IndexRequest(index, type).source(document));
        }

        try {
            // Send the request and get the response
            BulkResponse bulkResponse = aesClient.bulk(bulkRequest, RequestOptions.DEFAULT);

            // Check the response for failures
            if (bulkResponse.hasFailures()) {
                System.out.println("Encountered failures:");
                for (BulkItemResponse bulkItemResponse : bulkResponse) {
                    if (bulkItemResponse.isFailed()) {
                        System.out.println(bulkItemResponse.getFailureMessage());
                    }
                }
            }
            else {
                System.out.println("No failures!");
                // Uncomment these lines for a line-by-line summary
//                for (BulkItemResponse bulkItemResponse : bulkResponse) {
//                    System.out.println(bulkItemResponse.getResponse().toString());
//                }
            }
        }

        // Usually happens when the request size is too large
        catch (ElasticsearchStatusException e) {
            System.out.println("Encountered exception:");
            System.out.println(e.toString());
        }
    }

    // Adds the interceptor to the Elasticsearch REST client
    public static RestHighLevelClient aesClient() {
        AWS4Signer signer = new AWS4Signer();
        signer.setServiceName(serviceName);
        signer.setRegionName(region);
        HttpRequestInterceptor interceptor = new AWSRequestSigningApacheInterceptor(serviceName, signer, credentialsProvider);
        return new RestHighLevelClient(RestClient.builder(HttpHost.create(aesEndpoint)).setHttpClientConfigCallback(hacb -> hacb.addInterceptorLast(interceptor)));
    }
}

SSLContextHelper.java

package com.amazonaws.http;

import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.net.URL;
import java.security.KeyStore;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;



public class SSLContextHelper {

    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final String DEFAULT_SSL_CERTIFICATE = "rds-combined-ca-bundle.pem";
    private static final String SSL_CERTIFICATE = "sslCertificate";
    private static final String KEY_STORE_TYPE = "JKS";
    private static final String KEY_STORE_PROVIDER = "SUN";
    private static final String KEY_STORE_FILE_PREFIX = "sys-connect-via-ssl-test-cacerts";
    private static final String KEY_STORE_FILE_SUFFIX = ".jks";
    private static final String DEFAULT_KEY_STORE_PASSWORD = "changeit";
    private static final String SSL_TRUST_STORE = "javax.net.ssl.trustStore";
    private static final String SSL_TRUST_STORE_PASSWORD = "javax.net.ssl.trustStorePassword";
    private static final String SSL_TRUST_STORE_TYPE = "javax.net.ssl.trustStoreType";


    public static void setSslProperties() {

        try {
            String sslCertificate= System.getProperty(SSL_CERTIFICATE);
            if(StringUtils.isEmpty(sslCertificate)) {
                sslCertificate= DEFAULT_SSL_CERTIFICATE;
            }
            logger.info(" ssl certificate path {}",sslCertificate);
            System.setProperty(SSL_TRUST_STORE, createKeyStoreFile(sslCertificate));
            System.setProperty(SSL_TRUST_STORE_TYPE, KEY_STORE_TYPE);
            System.setProperty(SSL_TRUST_STORE_PASSWORD, DEFAULT_KEY_STORE_PASSWORD);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static String createKeyStoreFile(String sslCertificate) throws Exception {
        return createKeyStoreFile(createCertificate(sslCertificate)).getPath();
    }

    private static X509Certificate createCertificate(String sslCertificate) throws Exception {
        CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
        URL url = new File(sslCertificate).toURI().toURL();
        if (url == null) {
            throw new Exception();
        }
        try (InputStream certInputStream = url.openStream()) {
            return (X509Certificate) certFactory.generateCertificate(certInputStream);
        }
    }

    private static File createKeyStoreFile(X509Certificate rootX509Certificate) throws Exception {
        File keyStoreFile = File.createTempFile(KEY_STORE_FILE_PREFIX, KEY_STORE_FILE_SUFFIX);
        try (FileOutputStream fos = new FileOutputStream(keyStoreFile.getPath())) {
            KeyStore ks = KeyStore.getInstance(KEY_STORE_TYPE, KEY_STORE_PROVIDER);
            ks.load(null);
            ks.setCertificateEntry("rootCaCertificate", rootX509Certificate);
            ks.store(fos, DEFAULT_KEY_STORE_PASSWORD.toCharArray());
        }
        return keyStoreFile;
    }

}
4

0 回答 0