我正在尝试将 WebFlux 与 RSocket 一起使用,示例应用程序具有服务器和客户端应用程序。都在 WebFlux 和 RSocket 上运行,我的 rsocket 通信类型是请求流。客户端-服务器应用程序对于几个并发请求运行得非常好,但是当我用 1000qps 和 8 个线程加载测试时,请求开始挂起。在调查下面的示例代码通过负载测试。
工作样品
RSocketClientConfig.java
public class RSocketClientConfig {
@Bean
RSocketRequester rSocketRequester(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies,
RSocketClientProperties clientProp) {
RSocketRequester rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies)
.dataMimeType(new MimeType("application", "x-protobuf"))
.connectTcp(clientProp.getHost(), clientProp.getRsocPort()).retry().block();
rsocketRequester.rsocket().onClose().doOnError(error -> log.warn("Connection CLOSED"))
.doFinally(consumer -> log.info("Client DISCONNECTED")).subscribe();
return rsocketRequester;
}
}
客户端.java
@Service
public class PersonRSocketClient {
@Autowired
private RSocketRequester personClient;
public Flux<Person> list() {
return personClient.route("person").retrieveFlux(Person.class);
}
}
不工作
RSocketClientConfig.java
public class RSocketClientConfig {
@Bean
Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies,
RSocketClientProperties clientProp) {
Mono<RSocketRequester> rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies)
.dataMimeType(new MimeType("application", "x-protobuf"))
.connectTcp(clientProp.getHost(), clientProp.getRsocPort());
return rsocketRequester;
}
}
客户端.java
@Service
public class PersonRSocketClient {
@Autowired
private Mono<RSocketRequester> personClient;
public Flux<Person> list() {
return personClient
.flatMapMany(rsocket -> rsocket.route("person").retrieveFlux(Person.class));
}
}
如何正确地将请求流映射到通量?