1

我正在尝试将 WebFlux 与 RSocket 一起使用,示例应用程序具有服务器和客户端应用程序。都在 WebFlux 和 RSocket 上运行,我的 rsocket 通信类型是请求流。客户端-服务器应用程序对于几个并发请求运行得非常好,但是当我用 1000qps 和 8 个线程加载测试时,请求开始挂起。在调查下面的示例代码通过负载测试。


工作样品

RSocketClientConfig.java

public class RSocketClientConfig {

    @Bean
    RSocketRequester rSocketRequester(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies,
            RSocketClientProperties clientProp) {

        RSocketRequester rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies)
                .dataMimeType(new MimeType("application", "x-protobuf"))
                .connectTcp(clientProp.getHost(), clientProp.getRsocPort()).retry().block();

        rsocketRequester.rsocket().onClose().doOnError(error -> log.warn("Connection CLOSED"))
                .doFinally(consumer -> log.info("Client DISCONNECTED")).subscribe();
        return rsocketRequester;
    }


}

客户端.java

@Service
public class PersonRSocketClient {

    @Autowired
    private RSocketRequester personClient;

    public Flux<Person> list() {
        return personClient.route("person").retrieveFlux(Person.class);
    }

}

不工作

RSocketClientConfig.java

public class RSocketClientConfig {

    @Bean
    Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies,
        RSocketClientProperties clientProp) {

        
        Mono<RSocketRequester> rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies)
                .dataMimeType(new MimeType("application", "x-protobuf"))
                .connectTcp(clientProp.getHost(), clientProp.getRsocPort());

        return rsocketRequester;
    }
}

客户端.java

@Service
public class PersonRSocketClient {

    @Autowired
    private Mono<RSocketRequester> personClient;

    public Flux<Person> list() {
        return personClient
                .flatMapMany(rsocket -> rsocket.route("person").retrieveFlux(Person.class));
    }

}

如何正确地将请求流映射到通量

4

0 回答 0