0

我是响应式编程和 webflux 的新手,我正在评估从带有 WebMVC 的 Servlet 堆栈上的 Spring Data Elasticsearch 应用程序迁移到带有 Spring Webflux 的响应式堆栈。

我开发了两个相同的简单 Spring Boot 应用程序,它们可以使用 Spring Data Elasticsearch Repositories 执行 CRUD 操作。

测试是用户将文档索引/保存到 Elasticsearch。负载测试是 500 个并发用户,启动时间 20 秒,每个用户 20 次迭代(总共 10000 个文档)

我期待 Netty 上的 Webflux 优于 Tomcat 上的 MVC,尤其是在有更多并发用户的情况下。但结果却相反。Netty 的响应时间几乎是 tomcat 的两倍,我需要增加响应式客户端中的 maxConnection 队列,因为我收到了 ReadTimeoutExceptions。那么我做错了什么?

问题:

  1. Netty 上的 Webflux 不应该能够更好地处理更多并发用户吗?
  2. 为什么响应时间如此之长?...并且 netty 的吞吐量较低?
  3. 我是否需要以不同的方式配置响应式客户端以获得更好的性能?
  4. 拥有数百个 NIO 线程的 Tomcat 能否只处理更多请求并且比 Netty 事件循环更快?

这些应用程序具有以下堆栈:

Spring Web MVC:

<properties>
    <java.version>1.8</java.version>
    <spring-boot-starter-data-elasticsearch.version>2.4.4</spring-boot-starter-data-elasticsearch.version>
    <spring-boot-starter-test.version>2.4.4</spring-boot-starter-test.version>
    <spring-boot-starter-web.version>2.4.4</spring-boot-starter-web.version>
    <lombok.version>1.18.16</lombok.version>
    <jfairy.version>0.5.9</jfairy.version>
    <elasticsearch.version>7.12.0</elasticsearch.version>
</properties>
  • Tomcat (spring-boot-starter-tomcat:jar:2.4.2:compile)

  • RestHighLevelClient 用于对 Elasticsearch 的请求

     @Configuration
     public class MvcElasticsearchConfiguration extends AbstractElasticsearchConfiguration {
    
     @Value("${elasticsearch.host:localhost}")
     private String host;
    
     @Value("${elasticsearch.http.port:9200}")
     private int port;
    
     @Override
     @Bean
     public RestHighLevelClient elasticsearchClient() {
    
         final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
                 .connectedTo(getHostAndPort())
                 .build();
    
         return RestClients.create(clientConfiguration).rest();
     }
    
     private String getHostAndPort(){
         return host +":"+ port;
     }
     }
    

控制器:

@PostMapping(value = "/index")
public ResponseEntity<PersonDocumentDto> indexGeneratedPersonDocument() {

    PersonDocumentDto dto = this.service.indexGeneratedPersonDocument();

    return new ResponseEntity<>(dto, HttpStatus.CREATED);
}

服务:

public PersonDocumentDto indexGeneratedPersonDocument(){

    PersonDocument personDocument = personGenerator.generatePersonDoc();
    PersonDocumentDto personDocumentDto = new PersonDocumentDto();

    try {
        personDocumentDto = EntityDtoUtil.toDto(this.repository.save(personDocument));
        LOGGER.debug("Document indexed!");
    } catch (Exception e) {
        LOGGER.error("Unable to index document!",e);
    }

    return personDocumentDto;

}

Spring Webflux:

<properties>
    <java.version>1.8</java.version>
    <spring-boot-starter-data-elasticsearch.version>2.4.4</spring-boot-starter-data-elasticsearch.version>
    <spring-boot-starter-webflux.version>2.4.4</spring-boot-starter-webflux.version>
    <spring-boot-starter-test.version>2.4.4</spring-boot-starter-test.version>
    <reactor-test.version>3.4.2</reactor-test.version>
    <lombok.version>1.18.16</lombok.version>
    <jfairy.version>0.5.9</jfairy.version>
    <elasticsearch.version>7.12.0</elasticsearch.version>
</properties>
  • Netty (spring-boot-starter-reactor-netty:jar:2.4.2:compile)

  • ReactiveElasticSearchClient 用于对 Elasticsearch 的请求

     @Configuration
     public class ReactiveElasticsearchConfiguration extends AbstractReactiveElasticsearchConfiguration {
    
     @Value("${elasticsearch.host:localhost}")
     private String host;
    
     @Value("${elasticsearch.http.port:9200}")
     private int port;
    
     @Override
     @Bean
     public ReactiveElasticsearchClient reactiveElasticsearchClient() {
         ClientConfiguration clientConfiguration = ClientConfiguration.builder()
                 .connectedTo(getHostAndPort())
                 .withWebClientConfigurer(webClient -> {
                     ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                             .codecs(configurer -> configurer.defaultCodecs()
                                     .maxInMemorySize(-1))
                             .build();
                     String connectionProviderName = "myConnectionProvider";
                     int maxConnections = 1000;
                     HttpClient httpClient = HttpClient.create(ConnectionProvider.create(connectionProviderName, maxConnections));
    
                     return webClient
                             .mutate()
                             .clientConnector(new ReactorClientHttpConnector(httpClient))
                             .exchangeStrategies(exchangeStrategies)
                             .build();
                 })
                 .build();
    
         return ReactiveRestClients.create(clientConfiguration);
     }
    
     private String getHostAndPort(){
         return host +":"+ port;
     }
    

    }

处理程序:

public Mono<ServerResponse> indexSingleGeneratedPersonDoc(ServerRequest serverRequest){
    return this.service.indexGeneratedPersonDocument()
            .flatMap(personDocumentDto -> ServerResponse.ok().bodyValue(personDocumentDto))
            .onErrorResume(WebClientRequestException.class, e -> ServerResponse
                    .badRequest()
                    .bodyValue(Optional.ofNullable(e.getMessage()).orElseGet(() -> "Something went wrong!") ));

}

服务:

public Mono<PersonDocumentDto> indexGeneratedPersonDocument(){

    return personGenerator.generatePersonDocument()
            .flatMap(this.repository::save)
            .map(EntityDtoUtil::toDto)
            .doOnSuccess(response -> LOGGER.debug("Document indexed!"));
}

MVC ResponseTimesPercentiles: 500 个用户,20 次迭代,总共 10000 个文档

Webflux ResponseTimesPercentiles: 500 个用户,20 次迭代,总共 10000 个文档

4

0 回答 0