我在springboot应用程序中使用webclient来调用一个外部的restful webservice。间歇性地得到这个异常。
javax.net.ssl.SSLException: SSLEngine closed already SSLEngine closed already
在收到此异常之前,我在日志中看到以下警告。
javax.net.ssl.SSLException:
at io.netty.handler.ssl.SslHandler.wrap (SslHandler.java854)
at io.netty.handler.ssl.SslHandler.wrapAndFlush (SslHandler.java811)
at io.netty.handler.ssl.SslHandler.flush (SslHandler.java792)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0 (AbstractChannelHandlerContext.java750)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush (AbstractChannelHandlerContext.java742)
at io.netty.channel.AbstractChannelHandlerContext.flush (AbstractChannelHandlerContext.java728)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush (CombinedChannelDuplexHandler.java531)
at io.netty.channel.ChannelOutboundHandlerAdapter.flush (ChannelOutboundHandlerAdapter.java125)
at io.netty.channel.CombinedChannelDuplexHandler.flush (CombinedChannelDuplexHandler.java356)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0 (AbstractChannelHandlerContext.java750)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush (AbstractChannelHandlerContext.java765)
at io.netty.channel.AbstractChannelHandlerContext.write (AbstractChannelHandlerContext.java790)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush (AbstractChannelHandlerContext.java758)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush (AbstractChannelHandlerContext.java808)
at io.netty.channel.DefaultChannelPipeline.writeAndFlush (DefaultChannelPipeline.java1025)
at io.netty.channel.AbstractChannel.writeAndFlush (AbstractChannel.java294)
at reactor.netty.http.HttpOperations.lambda$send$0 (HttpOperations.java123)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext (MonoFlatMap.java118)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext (FluxMapFuseable.java121)
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext (FluxContextStart.java96)
at reactor.core.publisher.Operators$ScalarSubscription.request (Operators.java2344)
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.request (FluxContextStart.java125)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request (FluxMapFuseable.java162)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe (MonoFlatMap.java103)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe (FluxMapFuseable.java90)
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onSubscribe (FluxContextStart.java90)
at reactor.core.publisher.MonoJust.subscribe (MonoJust.java54)
at reactor.core.publisher.Mono.subscribe (Mono.java4213)
at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete (FluxConcatIterable.java146)
at reactor.core.publisher.FluxConcatIterable.subscribe (FluxConcatIterable.java60)
at reactor.core.publisher.MonoFromFluxOperator.subscribe (MonoFromFluxOperator.java81)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext (MonoFlatMap.java150)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext (FluxSwitchIfEmpty.java67)
at reactor.core.publisher.Operators$MonoSubscriber.complete (Operators.java1782)
at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete (MonoSingle.java171)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete (FluxMapFuseable.java144)
at reactor.core.publisher.FluxJust$WeakScalarSubscription.request (FluxJust.java101)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request (FluxMapFuseable.java162)
at reactor.core.publisher.MonoSingle$SingleSubscriber.request (MonoSingle.java94)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set (Operators.java2152)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe (Operators.java2026)
at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe (MonoSingle.java114)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe (FluxMapFuseable.java90)
at reactor.core.publisher.FluxJust.subscribe (FluxJust.java70)
at reactor.core.publisher.InternalMonoOperator.subscribe (InternalMonoOperator.java64)
at reactor.core.publisher.MonoDefer.subscribe (MonoDefer.java52)
at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange (HttpClientConnect.java442)
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange (ReactorNetty.java518)
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.run (PooledConnectionProvider.java633)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute (AbstractEventExecutor.java164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks (SingleThreadEventExecutor.java472)
at io.netty.channel.nio.NioEventLoop.run (NioEventLoop.java500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run (SingleThreadEventExecutor.java989)
at io.netty.util.internal.ThreadExecutorMap$2.run (ThreadExecutorMap.java74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run (FastThreadLocalRunnable.java30)
at java.lang.Thread.run (Thread.java748)
这是完整的堆栈跟踪:
reactor.core.Exceptions$ReactiveException:
at reactor.core.Exceptions.propagate (Exceptions.java393)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet (BlockingSingleSubscriber.java97)
at reactor.core.publisher.Mono.block (Mono.java1680)
at .service.CustomWebClient.callTarget (CustomWebClient.java48)
at .service.MessageServiceImpl.sendMessageToTargetGetAcknowledgement (MessageServiceImpl.java101)
at .service.MessageServiceImpl.transferMessagesFromSourceToTarget (MessageServiceImpl.java53)
at .controller.MessageController.processMessageFromSourceThread3 (MessageController.java71)
at .controller.MessageController$$FastClassBySpringCGLIB$$c25da1d1.invoke (<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke (MethodProxy.java218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint (CglibAopProxy.java771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed (ReflectiveMethodInvocation.java163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed (CglibAopProxy.java749)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction (TransactionAspectSupport.java367)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke (TransactionInterceptor.java118)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed (ReflectiveMethodInvocation.java186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed (CglibAopProxy.java749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept (CglibAopProxy.java691)
at .controller.MessageController$$EnhancerBySpringCGLIB$$7a54eddc.processMessageFromSourceThread3 (<generated>)
at sun.reflect.GeneratedMethodAccessor63.invoke
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java43)
at java.lang.reflect.Method.invoke (Method.java498)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run (ScheduledMethodRunnable.java84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run (DelegatingErrorHandlingRunnable.java54)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java511)
at java.util.concurrent.FutureTask.runAndReset (FutureTask.java308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301 (ScheduledThreadPoolExecutor.java180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java294)
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java624)
at java.lang.Thread.run (Thread.java748)
Inner exception javax.net.ssl.SSLException handled at reactor.core.Exceptions.propagate:
at io.netty.handler.ssl.SslHandler.wrap (SslHandler.java854)
这是 MessageServiceImpl.java
package service;
import org.hibernate.exception.SQLGrammarException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.HttpServerErrorException;
@Service
public class MessageServiceImpl implements MessageService {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageServiceImpl.class);
@Autowired
private MessageRepository repository;
@Value("${TARGET_ENDPOINT}")
private String baseEndPoint;
@Autowired
private CustomWebClient webClient;
public Message transferMessagesFromSourceToTarget()
{
Message message = new Message();
try {
message = getMessageFromSource();
//some code....
sendAcknowledgementToSource(message);
//some code....
} catch (HttpServerErrorException | HttpClientErrorException e) {
//somecode
} catch (InvalidDataAccessResourceUsageException | SQLGrammarException e) {
//somecode
}
return message;
}
private Message getMessageFromSource() {
return repository.getSourceMessageFromStroredProcedureCall();
}
private Message sendMessageToTargetGetAcknowledgement(Message message) {
ResponseEntity<String> response = null;
response = webClient.callTarget(baseEndPoint+ message.getHostURL(),message);
message.setHttpStatus(response.getStatusCodeValue());
return message;
}
}
这是 CustomWebClient 组件类
package service;
import XYZ.model.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
/
@Component
public class CustomWebClient {
private final WebClient webClient;
/**
* Constructor to autowire WebClient instance
* @param webClient webclient instance
*/
@Autowired
public CustomWebClient(WebClient webClient){
this.webClient = webClient;
}
public ResponseEntity<String> callTarget(String baseEndPoint, Message message) {
String response = webClient.post().uri(baseEndPoint)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE)
.body(BodyInserters.fromValue(message.getMessage()))
.retrieve()
.onStatus(HttpStatus::is5xxServerError, serverError ->
Mono.error(new HttpServerErrorException(serverError.statusCode())))
.onStatus(HttpStatus::is4xxClientError, clientError ->
Mono.error(new HttpClientErrorException(clientError.statusCode())))
.bodyToMono(String.class).block();
return new ResponseEntity<>(response, HttpStatus.OK);
}
}
在下面的配置中,我在 SSLContext 中添加了信任库。
package xyz.conf;
import static java.nio.charset.StandardCharsets.US_ASCII;
import xyz.constants.Constants;
import xyz.service.ResourceReader;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import javax.security.auth.x500.X500Principal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;
/**
* Webclient configuration
*/
@Configuration
public class WebClientConfig {
private String user;
private String pass;
private ResourceReader resourceReader;
/**
* Constructor to autowire ResourceReader
* @param resourceReader ResourceReader object
*/
@Autowired
public WebClientConfig(ResourceReader resourceReader) {
this.resourceReader = resourceReader;
}
@Value("${USER}")
public void setuser(String user) {
this.user = user;
}
@Value("${PASSWD}")
public void setpass(String pass) {
this.pass = pass;
}
private static final Logger LOGGER = LoggerFactory.getLogger(WebClientConfig.class);
@Bean
WebClient getWebClient () throws IOException, KeyManagementException, NoSuchAlgorithmException, KeyStoreException, CertificateException {
LOGGER.info("WebClient configuration started.");
TrustManagerFactory trustManagerFactory =
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
KeyStore trustStore = getTrustStore(resourceReader.readTrustStoreConfig());
trustManagerFactory.init(trustStore);
SSLContext context = SSLContext.getInstance("TLSv1.2");
context.init(null, trustManagerFactory.getTrustManagers(), new SecureRandom());
SslContext sslContext = SslContextBuilder
.forClient()
.trustManager(trustManagerFactory)
.build();
HttpClient httpClient = HttpClient.create().secure( t -> t.sslContext(sslContext) );
httpClient = httpClient.tcpConfiguration(tcpClient -> tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000));
httpClient = httpClient.responseTimeout(Duration.ofMillis(30000));
LOGGER.info("WebClient configured successfully..");
ReactorClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector(httpClient);
return WebClient.builder().clientConnector(reactorClientHttpConnector)
.defaultHeaders(header -> header.setBasicAuth(user, pass))
.build();
}
private KeyStore getTrustStore(String certificateChainPem)
throws KeyStoreException, CertificateException, NoSuchAlgorithmException, IOException {
KeyStore keyStore = null;
keyStore = KeyStore.getInstance("JKS");
keyStore.load(null, null);
List<X509Certificate> certificates = readCertificateChain(certificateChainPem);
for (X509Certificate certificate : certificates) {
X500Principal principal = certificate.getSubjectX500Principal();
keyStore.setCertificateEntry(principal.getName("RFC2253"), certificate);
}
return keyStore;
}
private List<X509Certificate> readCertificateChain(String certificateChainPem)
throws CertificateException {
List<X509Certificate> certificates = new ArrayList<>();
Matcher matcher = Constants.CERT_PATTERN.matcher(certificateChainPem);
CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
int start = 0;
while (matcher.find(start)) {
byte[] buffer = base64Decode(matcher.group(1));
certificates.add((X509Certificate) certificateFactory
.generateCertificate(new ByteArrayInputStream(buffer)));
start = matcher.end();
}
return certificates;
}
private byte[] base64Decode(String base64) {
return java.util.Base64.getMimeDecoder().decode(base64.getBytes(US_ASCII));
}
}
下面是应用程序属性文件
logging.level.org.springframework=INFO
TARGET_ENDPOINT=XYZ
USER=XYZ
PASSWD=XYZ
#maxidle timeout for netty pool
reactor.netty.pool.maxIdleTime=3000
我已经尝试通过下面链接中给出的解决方案来解决这个问题,但它没有用。 https://github.com/reactor/reactor-netty/issues/782
以下是版本详细信息:-
Springboot 版本:2.3.3.RELEASE