我正在构建一个同时使用AWS Elasticsearch 服务和AWS DocumentDB(与 mongoDB 兼容)的应用程序。
为了连接到 Elasticsearch 服务,我复制了 awsdocs 提供的示例项目:https ://github.com/awsdocs/amazon-elasticsearch-service-developer-guide/blob/master/sample_code/java/amazon-es- docs-sample-client.zip
示例项目包含两个文件AWSRequestSigningApacheInterceptor.java,它充当普通 ElasticSearch RestClient 的代理,以及一个名为AmazonElasticsearchServiceClient.java的示例测试。这些张贴在底部。我已经尝试过了,它工作正常。
现在解决问题:
为了通过 SSL 连接到 DocumentDB,我需要使用 System.setProperties(),这似乎会破坏 Elasticsearch 服务连接。类似于 AWS 描述的以编程方式连接到 DocumentDB 集群的过程,我使用 SSL 上下文帮助程序来设置密钥库并将连接到 DocumentDB 所需的 pem 文件放入其中。运行文件SSLContextHelper.java会中断 ElasticSearch 连接,从而产生以下错误:
javax.net.ssl.SSLHandshakeException:一般 SSLEngine 问题原因:sun.security.validator.ValidatorException:PKIX 路径构建失败:sun.security.provider.certpath.SunCertPathBuilderException:无法找到请求目标的有效证书路径原因:sun .security.provider.certpath.SunCertPathBuilderException:无法找到请求目标的有效证书路径
我认为这个问题可能是因为 AWSRequestSigningApacheInterceptor.java 利用了与信任库相关的系统属性。虽然我不知道如何解决它。有什么建议么?
AWSRequestSigningApacheInterceptor.java
/*
* Copyright 2012-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
* the License. A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package com.amazonaws.http;
import com.amazonaws.DefaultRequest;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.Signer;
import org.apache.http.Header;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST;
/**
* An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer}
* and {@link AWSCredentialsProvider}.
*/
public class AWSRequestSigningApacheInterceptor implements HttpRequestInterceptor {
/**
* The service that we're connecting to. Technically not necessary.
* Could be used by a future Signer, though.
*/
private final String service;
/**
* The particular signer implementation.
*/
private final Signer signer;
/**
* The source of AWS credentials for signing.
*/
private final AWSCredentialsProvider awsCredentialsProvider;
/**
*
* @param service service that we're connecting to
* @param signer particular signer implementation
* @param awsCredentialsProvider source of AWS credentials for signing
*/
public AWSRequestSigningApacheInterceptor(final String service,
final Signer signer,
final AWSCredentialsProvider awsCredentialsProvider) {
this.service = service;
this.signer = signer;
this.awsCredentialsProvider = awsCredentialsProvider;
}
/**
* {@inheritDoc}
*/
@Override
public void process(final HttpRequest request, final HttpContext context)
throws HttpException, IOException {
URIBuilder uriBuilder;
try {
uriBuilder = new URIBuilder(request.getRequestLine().getUri());
} catch (URISyntaxException e) {
throw new IOException("Invalid URI" , e);
}
// Copy Apache HttpRequest to AWS DefaultRequest
DefaultRequest<?> signableRequest = new DefaultRequest<>(service);
HttpHost host = (HttpHost) context.getAttribute(HTTP_TARGET_HOST);
if (host != null) {
signableRequest.setEndpoint(URI.create(host.toURI()));
}
final HttpMethodName httpMethod =
HttpMethodName.fromValue(request.getRequestLine().getMethod());
signableRequest.setHttpMethod(httpMethod);
try {
signableRequest.setResourcePath(uriBuilder.build().getRawPath());
} catch (URISyntaxException e) {
throw new IOException("Invalid URI" , e);
}
if (request instanceof HttpEntityEnclosingRequest) {
HttpEntityEnclosingRequest httpEntityEnclosingRequest =
(HttpEntityEnclosingRequest) request;
if (httpEntityEnclosingRequest.getEntity() != null) {
signableRequest.setContent(httpEntityEnclosingRequest.getEntity().getContent());
}
}
signableRequest.setParameters(nvpToMapParams(uriBuilder.getQueryParams()));
signableRequest.setHeaders(headerArrayToMap(request.getAllHeaders()));
// Sign it
signer.sign(signableRequest, awsCredentialsProvider.getCredentials());
// Now copy everything back
request.setHeaders(mapToHeaderArray(signableRequest.getHeaders()));
if (request instanceof HttpEntityEnclosingRequest) {
HttpEntityEnclosingRequest httpEntityEnclosingRequest =
(HttpEntityEnclosingRequest) request;
if (httpEntityEnclosingRequest.getEntity() != null) {
BasicHttpEntity basicHttpEntity = new BasicHttpEntity();
basicHttpEntity.setContent(signableRequest.getContent());
httpEntityEnclosingRequest.setEntity(basicHttpEntity);
}
}
}
/**
*
* @param params list of HTTP query params as NameValuePairs
* @return a multimap of HTTP query params
*/
private static Map<String, List<String>> nvpToMapParams(final List<NameValuePair> params) {
Map<String, List<String>> parameterMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (NameValuePair nvp : params) {
List<String> argsList =
parameterMap.computeIfAbsent(nvp.getName(), k -> new ArrayList<>());
argsList.add(nvp.getValue());
}
return parameterMap;
}
/**
* @param headers modeled Header objects
* @return a Map of header entries
*/
private static Map<String, String> headerArrayToMap(final Header[] headers) {
Map<String, String> headersMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (Header header : headers) {
if (!skipHeader(header)) {
headersMap.put(header.getName(), header.getValue());
}
}
return headersMap;
}
/**
* @param header header line to check
* @return true if the given header should be excluded when signing
*/
private static boolean skipHeader(final Header header) {
return ("content-length".equalsIgnoreCase(header.getName())
&& "0".equals(header.getValue())) // Strip Content-Length: 0
|| "host".equalsIgnoreCase(header.getName()); // Host comes from endpoint
}
/**
* @param mapHeaders Map of header entries
* @return modeled Header objects
*/
private static Header[] mapToHeaderArray(final Map<String, String> mapHeaders) {
Header[] headers = new Header[mapHeaders.size()];
int i = 0;
for (Map.Entry<String, String> headerEntry : mapHeaders.entrySet()) {
headers[i++] = new BasicHeader(headerEntry.getKey(), headerEntry.getValue());
}
return headers;
}
}
AmazonElasticsearchServiceClient.java
package com.amazonaws.samples;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import com.amazonaws.auth.AWS4Signer;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.http.AWSRequestSigningApacheInterceptor;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class AmazonElasticsearchServiceClient {
private static final String serviceName = "es";
private static final String region = "us-west-2";
private static final String aesEndpoint = ""; // e.g. https://search-mydomain.us-west-1.es.amazonaws.com
private static final String type = "_doc";
static final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
public static void main(String[] args) throws IOException {
RestHighLevelClient aesClient = aesClient();
String index = "java-client-test-index";
// Create a document that simulates a simple log line from a web server
Map<String, Object> document = new HashMap<>();
document.put("method", "GET");
document.put("client_ip_address", "123.456.78.90");
document.put("timestamp", "10/Oct/2000:14:56:14 -0700");
System.out.println("Demoing a single index request:");
String id = "1";
IndexRequest indexRequest = new IndexRequest(index, type, id).source(document);
IndexResponse indexResponse = aesClient.index(indexRequest, RequestOptions.DEFAULT);
System.out.println(indexResponse.toString());
System.out.println("Demoing a 1 MB bulk request:");
BulkRequest bulkRequest = new BulkRequest();
// Add documents (the simple log line from earlier) to the request until it exceeds 1 MB
while (bulkRequest.estimatedSizeInBytes() < 1000000) {
// By not specifying an ID, these documents get auto-assigned IDs
bulkRequest.add(new IndexRequest(index, type).source(document));
}
try {
// Send the request and get the response
BulkResponse bulkResponse = aesClient.bulk(bulkRequest, RequestOptions.DEFAULT);
// Check the response for failures
if (bulkResponse.hasFailures()) {
System.out.println("Encountered failures:");
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
System.out.println(bulkItemResponse.getFailureMessage());
}
}
}
else {
System.out.println("No failures!");
// Uncomment these lines for a line-by-line summary
// for (BulkItemResponse bulkItemResponse : bulkResponse) {
// System.out.println(bulkItemResponse.getResponse().toString());
// }
}
}
// Usually happens when the request size is too large
catch (ElasticsearchStatusException e) {
System.out.println("Encountered exception:");
System.out.println(e.toString());
}
}
// Adds the interceptor to the Elasticsearch REST client
public static RestHighLevelClient aesClient() {
AWS4Signer signer = new AWS4Signer();
signer.setServiceName(serviceName);
signer.setRegionName(region);
HttpRequestInterceptor interceptor = new AWSRequestSigningApacheInterceptor(serviceName, signer, credentialsProvider);
return new RestHighLevelClient(RestClient.builder(HttpHost.create(aesEndpoint)).setHttpClientConfigCallback(hacb -> hacb.addInterceptorLast(interceptor)));
}
}
SSLContextHelper.java
package com.amazonaws.http;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.net.URL;
import java.security.KeyStore;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
public class SSLContextHelper {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String DEFAULT_SSL_CERTIFICATE = "rds-combined-ca-bundle.pem";
private static final String SSL_CERTIFICATE = "sslCertificate";
private static final String KEY_STORE_TYPE = "JKS";
private static final String KEY_STORE_PROVIDER = "SUN";
private static final String KEY_STORE_FILE_PREFIX = "sys-connect-via-ssl-test-cacerts";
private static final String KEY_STORE_FILE_SUFFIX = ".jks";
private static final String DEFAULT_KEY_STORE_PASSWORD = "changeit";
private static final String SSL_TRUST_STORE = "javax.net.ssl.trustStore";
private static final String SSL_TRUST_STORE_PASSWORD = "javax.net.ssl.trustStorePassword";
private static final String SSL_TRUST_STORE_TYPE = "javax.net.ssl.trustStoreType";
public static void setSslProperties() {
try {
String sslCertificate= System.getProperty(SSL_CERTIFICATE);
if(StringUtils.isEmpty(sslCertificate)) {
sslCertificate= DEFAULT_SSL_CERTIFICATE;
}
logger.info(" ssl certificate path {}",sslCertificate);
System.setProperty(SSL_TRUST_STORE, createKeyStoreFile(sslCertificate));
System.setProperty(SSL_TRUST_STORE_TYPE, KEY_STORE_TYPE);
System.setProperty(SSL_TRUST_STORE_PASSWORD, DEFAULT_KEY_STORE_PASSWORD);
} catch (Exception e) {
e.printStackTrace();
}
}
private static String createKeyStoreFile(String sslCertificate) throws Exception {
return createKeyStoreFile(createCertificate(sslCertificate)).getPath();
}
private static X509Certificate createCertificate(String sslCertificate) throws Exception {
CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
URL url = new File(sslCertificate).toURI().toURL();
if (url == null) {
throw new Exception();
}
try (InputStream certInputStream = url.openStream()) {
return (X509Certificate) certFactory.generateCertificate(certInputStream);
}
}
private static File createKeyStoreFile(X509Certificate rootX509Certificate) throws Exception {
File keyStoreFile = File.createTempFile(KEY_STORE_FILE_PREFIX, KEY_STORE_FILE_SUFFIX);
try (FileOutputStream fos = new FileOutputStream(keyStoreFile.getPath())) {
KeyStore ks = KeyStore.getInstance(KEY_STORE_TYPE, KEY_STORE_PROVIDER);
ks.load(null);
ks.setCertificateEntry("rootCaCertificate", rootX509Certificate);
ks.store(fos, DEFAULT_KEY_STORE_PASSWORD.toCharArray());
}
return keyStoreFile;
}
}