6

我在springboot应用程序中使用webclient来调用一个外部的restful webservice。间歇性地得到这个异常。

javax.net.ssl.SSLException: SSLEngine closed already SSLEngine closed already

在收到此异常之前,我在日志中看到以下警告。

javax.net.ssl.SSLException:
   at io.netty.handler.ssl.SslHandler.wrap (SslHandler.java854)
   at io.netty.handler.ssl.SslHandler.wrapAndFlush (SslHandler.java811)
   at io.netty.handler.ssl.SslHandler.flush (SslHandler.java792)
   at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0 (AbstractChannelHandlerContext.java750)
   at io.netty.channel.AbstractChannelHandlerContext.invokeFlush (AbstractChannelHandlerContext.java742)
   at io.netty.channel.AbstractChannelHandlerContext.flush (AbstractChannelHandlerContext.java728)
   at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush (CombinedChannelDuplexHandler.java531)
   at io.netty.channel.ChannelOutboundHandlerAdapter.flush (ChannelOutboundHandlerAdapter.java125)
   at io.netty.channel.CombinedChannelDuplexHandler.flush (CombinedChannelDuplexHandler.java356)
   at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0 (AbstractChannelHandlerContext.java750)
   at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush (AbstractChannelHandlerContext.java765)
   at io.netty.channel.AbstractChannelHandlerContext.write (AbstractChannelHandlerContext.java790)
   at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush (AbstractChannelHandlerContext.java758)
   at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush (AbstractChannelHandlerContext.java808)
   at io.netty.channel.DefaultChannelPipeline.writeAndFlush (DefaultChannelPipeline.java1025)
   at io.netty.channel.AbstractChannel.writeAndFlush (AbstractChannel.java294)
   at reactor.netty.http.HttpOperations.lambda$send$0 (HttpOperations.java123)
   at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext (MonoFlatMap.java118)
   at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext (FluxMapFuseable.java121)
   at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext (FluxContextStart.java96)
   at reactor.core.publisher.Operators$ScalarSubscription.request (Operators.java2344)
   at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.request (FluxContextStart.java125)
   at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request (FluxMapFuseable.java162)
   at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe (MonoFlatMap.java103)
   at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe (FluxMapFuseable.java90)
   at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onSubscribe (FluxContextStart.java90)
   at reactor.core.publisher.MonoJust.subscribe (MonoJust.java54)
   at reactor.core.publisher.Mono.subscribe (Mono.java4213)
   at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete (FluxConcatIterable.java146)
   at reactor.core.publisher.FluxConcatIterable.subscribe (FluxConcatIterable.java60)
   at reactor.core.publisher.MonoFromFluxOperator.subscribe (MonoFromFluxOperator.java81)
   at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext (MonoFlatMap.java150)
   at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext (FluxSwitchIfEmpty.java67)
   at reactor.core.publisher.Operators$MonoSubscriber.complete (Operators.java1782)
   at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete (MonoSingle.java171)
   at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete (FluxMapFuseable.java144)
   at reactor.core.publisher.FluxJust$WeakScalarSubscription.request (FluxJust.java101)
   at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request (FluxMapFuseable.java162)
   at reactor.core.publisher.MonoSingle$SingleSubscriber.request (MonoSingle.java94)
   at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set (Operators.java2152)
   at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe (Operators.java2026)
   at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe (MonoSingle.java114)
   at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe (FluxMapFuseable.java90)
   at reactor.core.publisher.FluxJust.subscribe (FluxJust.java70)
   at reactor.core.publisher.InternalMonoOperator.subscribe (InternalMonoOperator.java64)
   at reactor.core.publisher.MonoDefer.subscribe (MonoDefer.java52)
   at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange (HttpClientConnect.java442)
   at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange (ReactorNetty.java518)
   at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.run (PooledConnectionProvider.java633)
   at io.netty.util.concurrent.AbstractEventExecutor.safeExecute (AbstractEventExecutor.java164)
   at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks (SingleThreadEventExecutor.java472)
   at io.netty.channel.nio.NioEventLoop.run (NioEventLoop.java500)
   at io.netty.util.concurrent.SingleThreadEventExecutor$4.run (SingleThreadEventExecutor.java989)
   at io.netty.util.internal.ThreadExecutorMap$2.run (ThreadExecutorMap.java74)
   at io.netty.util.concurrent.FastThreadLocalRunnable.run (FastThreadLocalRunnable.java30)
   at java.lang.Thread.run (Thread.java748)

这是完整的堆栈跟踪:

reactor.core.Exceptions$ReactiveException:
       at reactor.core.Exceptions.propagate (Exceptions.java393)
       at reactor.core.publisher.BlockingSingleSubscriber.blockingGet (BlockingSingleSubscriber.java97)
       at reactor.core.publisher.Mono.block (Mono.java1680)
       at .service.CustomWebClient.callTarget (CustomWebClient.java48)
       at .service.MessageServiceImpl.sendMessageToTargetGetAcknowledgement (MessageServiceImpl.java101)
       at .service.MessageServiceImpl.transferMessagesFromSourceToTarget (MessageServiceImpl.java53)
       at .controller.MessageController.processMessageFromSourceThread3 (MessageController.java71)
       at .controller.MessageController$$FastClassBySpringCGLIB$$c25da1d1.invoke (<generated>)
       at org.springframework.cglib.proxy.MethodProxy.invoke (MethodProxy.java218)
       at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint (CglibAopProxy.java771)
       at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed (ReflectiveMethodInvocation.java163)
       at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed (CglibAopProxy.java749)
       at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction (TransactionAspectSupport.java367)
       at org.springframework.transaction.interceptor.TransactionInterceptor.invoke (TransactionInterceptor.java118)
       at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed (ReflectiveMethodInvocation.java186)
       at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed (CglibAopProxy.java749)
       at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept (CglibAopProxy.java691)
       at .controller.MessageController$$EnhancerBySpringCGLIB$$7a54eddc.processMessageFromSourceThread3 (<generated>)
       at sun.reflect.GeneratedMethodAccessor63.invoke
       at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java43)
       at java.lang.reflect.Method.invoke (Method.java498)
       at org.springframework.scheduling.support.ScheduledMethodRunnable.run (ScheduledMethodRunnable.java84)
       at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run (DelegatingErrorHandlingRunnable.java54)
       at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java511)
       at java.util.concurrent.FutureTask.runAndReset (FutureTask.java308)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301 (ScheduledThreadPoolExecutor.java180)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java294)
       at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java1149)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java624)
       at java.lang.Thread.run (Thread.java748)
    Inner exception javax.net.ssl.SSLException handled at reactor.core.Exceptions.propagate:
       at io.netty.handler.ssl.SslHandler.wrap (SslHandler.java854)

这是 MessageServiceImpl.java

package service;
import org.hibernate.exception.SQLGrammarException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.HttpServerErrorException;


@Service
public class MessageServiceImpl implements MessageService {

  private static final Logger LOGGER = LoggerFactory.getLogger(MessageServiceImpl.class);

  @Autowired
  private MessageRepository repository;

  @Value("${TARGET_ENDPOINT}")
  private String baseEndPoint;

  @Autowired
  private CustomWebClient webClient;

  
  public Message transferMessagesFromSourceToTarget()
       {

    Message message = new Message();
    try {

      message = getMessageFromSource();
      //some code....   
      
      sendAcknowledgementToSource(message);
      //some code....
     

    } catch (HttpServerErrorException | HttpClientErrorException e) {
      //somecode

    } catch (InvalidDataAccessResourceUsageException | SQLGrammarException  e) {
      //somecode
    }

    return message;

  }

 
  private Message getMessageFromSource() {
    return repository.getSourceMessageFromStroredProcedureCall();
  }

  
  private Message sendMessageToTargetGetAcknowledgement(Message message) {
    
    ResponseEntity<String> response = null;
    
    response = webClient.callTarget(baseEndPoint+ message.getHostURL(),message);
   
    message.setHttpStatus(response.getStatusCodeValue());
    
    return message;
  }

}

这是 CustomWebClient 组件类

package service;

import XYZ.model.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

/
@Component
public class CustomWebClient {

  private final WebClient webClient;

  /**
   * Constructor to autowire WebClient instance
   * @param webClient webclient instance
   */
  @Autowired
  public CustomWebClient(WebClient webClient){
    this.webClient = webClient;
  }

  
  public ResponseEntity<String> callTarget(String baseEndPoint, Message message) {
    String response = webClient.post().uri(baseEndPoint)
        .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE)
        .body(BodyInserters.fromValue(message.getMessage()))
        .retrieve()
        .onStatus(HttpStatus::is5xxServerError, serverError ->
            Mono.error(new HttpServerErrorException(serverError.statusCode())))
        .onStatus(HttpStatus::is4xxClientError, clientError ->
            Mono.error(new HttpClientErrorException(clientError.statusCode())))
        .bodyToMono(String.class).block();
    return new ResponseEntity<>(response, HttpStatus.OK);
  }
}

在下面的配置中,我在 SSLContext 中添加了信任库。

package xyz.conf;

import static java.nio.charset.StandardCharsets.US_ASCII;

import xyz.constants.Constants;
import xyz.service.ResourceReader;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import javax.security.auth.x500.X500Principal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;

/**
 * Webclient configuration
 */
@Configuration
public class WebClientConfig {

  private String user;

  private String pass;

  private ResourceReader resourceReader;

  /**
   * Constructor to autowire ResourceReader
   * @param resourceReader ResourceReader object
   */
  @Autowired
  public WebClientConfig(ResourceReader resourceReader) {
    this.resourceReader = resourceReader;
  }

  @Value("${USER}")
  public void setuser(String user) {
    this.user = user;
  }

  @Value("${PASSWD}")
  public void setpass(String pass) {
    this.pass = pass;
  }

  private static final Logger LOGGER = LoggerFactory.getLogger(WebClientConfig.class);

  @Bean
  WebClient getWebClient () throws IOException, KeyManagementException, NoSuchAlgorithmException, KeyStoreException, CertificateException {
    LOGGER.info("WebClient configuration started.");
    TrustManagerFactory trustManagerFactory =
        TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
    KeyStore trustStore = getTrustStore(resourceReader.readTrustStoreConfig());

    trustManagerFactory.init(trustStore);
    SSLContext context = SSLContext.getInstance("TLSv1.2");
    context.init(null, trustManagerFactory.getTrustManagers(), new SecureRandom());

    SslContext sslContext = SslContextBuilder
        .forClient()
        .trustManager(trustManagerFactory)
        .build();

    HttpClient httpClient = HttpClient.create().secure( t -> t.sslContext(sslContext) );
    httpClient = httpClient.tcpConfiguration(tcpClient -> tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000));
    httpClient = httpClient.responseTimeout(Duration.ofMillis(30000));

    LOGGER.info("WebClient configured successfully..");
    ReactorClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector(httpClient);
    return WebClient.builder().clientConnector(reactorClientHttpConnector)
        .defaultHeaders(header -> header.setBasicAuth(user, pass))
        .build();

  }

  private KeyStore getTrustStore(String certificateChainPem)
      throws KeyStoreException, CertificateException, NoSuchAlgorithmException, IOException {

    KeyStore keyStore = null;
    keyStore = KeyStore.getInstance("JKS");
    keyStore.load(null, null);

    List<X509Certificate> certificates = readCertificateChain(certificateChainPem);
    for (X509Certificate certificate : certificates) {
      X500Principal principal = certificate.getSubjectX500Principal();
      keyStore.setCertificateEntry(principal.getName("RFC2253"), certificate);
    }

    return keyStore;
  }

  private List<X509Certificate> readCertificateChain(String certificateChainPem)
      throws CertificateException {
    List<X509Certificate> certificates = new ArrayList<>();

    Matcher matcher = Constants.CERT_PATTERN.matcher(certificateChainPem);
    CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");

    int start = 0;
    while (matcher.find(start)) {
      byte[] buffer = base64Decode(matcher.group(1));
      certificates.add((X509Certificate) certificateFactory
          .generateCertificate(new ByteArrayInputStream(buffer)));
      start = matcher.end();
    }

    return certificates;
  }

  private byte[] base64Decode(String base64) {
    return java.util.Base64.getMimeDecoder().decode(base64.getBytes(US_ASCII));
  }    

}

下面是应用程序属性文件

logging.level.org.springframework=INFO

TARGET_ENDPOINT=XYZ
USER=XYZ
PASSWD=XYZ

#maxidle timeout for netty pool
reactor.netty.pool.maxIdleTime=3000

我已经尝试通过下面链接中给出的解决方案来解决这个问题,但它没有用。 https://github.com/reactor/reactor-netty/issues/782

以下是版本详细信息:-

Springboot 版本:2.3.3.RELEASE

4

0 回答 0