我是响应式编程和 webflux 的新手,我正在评估从带有 WebMVC 的 Servlet 堆栈上的 Spring Data Elasticsearch 应用程序迁移到带有 Spring Webflux 的响应式堆栈。
我开发了两个相同的简单 Spring Boot 应用程序,它们可以使用 Spring Data Elasticsearch Repositories 执行 CRUD 操作。
测试是用户将文档索引/保存到 Elasticsearch。负载测试是 500 个并发用户,启动时间 20 秒,每个用户 20 次迭代(总共 10000 个文档)
我期待 Netty 上的 Webflux 优于 Tomcat 上的 MVC,尤其是在有更多并发用户的情况下。但结果却相反。Netty 的响应时间几乎是 tomcat 的两倍,我需要增加响应式客户端中的 maxConnection 队列,因为我收到了 ReadTimeoutExceptions。那么我做错了什么?
问题:
- Netty 上的 Webflux 不应该能够更好地处理更多并发用户吗?
- 为什么响应时间如此之长?...并且 netty 的吞吐量较低?
- 我是否需要以不同的方式配置响应式客户端以获得更好的性能?
- 拥有数百个 NIO 线程的 Tomcat 能否只处理更多请求并且比 Netty 事件循环更快?
这些应用程序具有以下堆栈:
Spring Web MVC:
<properties>
<java.version>1.8</java.version>
<spring-boot-starter-data-elasticsearch.version>2.4.4</spring-boot-starter-data-elasticsearch.version>
<spring-boot-starter-test.version>2.4.4</spring-boot-starter-test.version>
<spring-boot-starter-web.version>2.4.4</spring-boot-starter-web.version>
<lombok.version>1.18.16</lombok.version>
<jfairy.version>0.5.9</jfairy.version>
<elasticsearch.version>7.12.0</elasticsearch.version>
</properties>
Tomcat (spring-boot-starter-tomcat:jar:2.4.2:compile)
RestHighLevelClient 用于对 Elasticsearch 的请求
@Configuration public class MvcElasticsearchConfiguration extends AbstractElasticsearchConfiguration { @Value("${elasticsearch.host:localhost}") private String host; @Value("${elasticsearch.http.port:9200}") private int port; @Override @Bean public RestHighLevelClient elasticsearchClient() { final ClientConfiguration clientConfiguration = ClientConfiguration.builder() .connectedTo(getHostAndPort()) .build(); return RestClients.create(clientConfiguration).rest(); } private String getHostAndPort(){ return host +":"+ port; } }
控制器:
@PostMapping(value = "/index")
public ResponseEntity<PersonDocumentDto> indexGeneratedPersonDocument() {
PersonDocumentDto dto = this.service.indexGeneratedPersonDocument();
return new ResponseEntity<>(dto, HttpStatus.CREATED);
}
服务:
public PersonDocumentDto indexGeneratedPersonDocument(){
PersonDocument personDocument = personGenerator.generatePersonDoc();
PersonDocumentDto personDocumentDto = new PersonDocumentDto();
try {
personDocumentDto = EntityDtoUtil.toDto(this.repository.save(personDocument));
LOGGER.debug("Document indexed!");
} catch (Exception e) {
LOGGER.error("Unable to index document!",e);
}
return personDocumentDto;
}
Spring Webflux:
<properties>
<java.version>1.8</java.version>
<spring-boot-starter-data-elasticsearch.version>2.4.4</spring-boot-starter-data-elasticsearch.version>
<spring-boot-starter-webflux.version>2.4.4</spring-boot-starter-webflux.version>
<spring-boot-starter-test.version>2.4.4</spring-boot-starter-test.version>
<reactor-test.version>3.4.2</reactor-test.version>
<lombok.version>1.18.16</lombok.version>
<jfairy.version>0.5.9</jfairy.version>
<elasticsearch.version>7.12.0</elasticsearch.version>
</properties>
Netty (spring-boot-starter-reactor-netty:jar:2.4.2:compile)
ReactiveElasticSearchClient 用于对 Elasticsearch 的请求
@Configuration public class ReactiveElasticsearchConfiguration extends AbstractReactiveElasticsearchConfiguration { @Value("${elasticsearch.host:localhost}") private String host; @Value("${elasticsearch.http.port:9200}") private int port; @Override @Bean public ReactiveElasticsearchClient reactiveElasticsearchClient() { ClientConfiguration clientConfiguration = ClientConfiguration.builder() .connectedTo(getHostAndPort()) .withWebClientConfigurer(webClient -> { ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() .codecs(configurer -> configurer.defaultCodecs() .maxInMemorySize(-1)) .build(); String connectionProviderName = "myConnectionProvider"; int maxConnections = 1000; HttpClient httpClient = HttpClient.create(ConnectionProvider.create(connectionProviderName, maxConnections)); return webClient .mutate() .clientConnector(new ReactorClientHttpConnector(httpClient)) .exchangeStrategies(exchangeStrategies) .build(); }) .build(); return ReactiveRestClients.create(clientConfiguration); } private String getHostAndPort(){ return host +":"+ port; }
}
处理程序:
public Mono<ServerResponse> indexSingleGeneratedPersonDoc(ServerRequest serverRequest){
return this.service.indexGeneratedPersonDocument()
.flatMap(personDocumentDto -> ServerResponse.ok().bodyValue(personDocumentDto))
.onErrorResume(WebClientRequestException.class, e -> ServerResponse
.badRequest()
.bodyValue(Optional.ofNullable(e.getMessage()).orElseGet(() -> "Something went wrong!") ));
}
服务:
public Mono<PersonDocumentDto> indexGeneratedPersonDocument(){
return personGenerator.generatePersonDocument()
.flatMap(this.repository::save)
.map(EntityDtoUtil::toDto)
.doOnSuccess(response -> LOGGER.debug("Document indexed!"));
}
MVC ResponseTimesPercentiles: 500 个用户,20 次迭代,总共 10000 个文档
Webflux ResponseTimesPercentiles: 500 个用户,20 次迭代,总共 10000 个文档