2

我和 Spring boot 一起玩 RSocket。我想做一个简单的请求-响应示例。例如,我从此链接获取代码:

https://www.baeldung.com/spring-boot-rsocket#request-response

源代码:

https://github.com/eugenp/tutorials/tree/master/spring-5-webflux/src/main/java/com/baeldung/spring/rsocket

当我在没有更改的情况下运行示例代码时,在异常请求期间出现错误。这个错误不是这个问题的重点,但我只想通过 baeldung 显示对原始源的更改。

[reactor-tcp-nio-1] org.springframework.core.log.CompositeLog: [5927a44d-9] HTTP GET "/current/pko" io.rsocket.exceptions.ApplicationErrorException 的 500 服务器错误:没有目标处理程序' ' 在 io.rsocket.exceptions.Exceptions.from(Exceptions.java:76) 抑制:reactor.core.publisher.FluxOnAssembly$OnAssemblyException:在以下站点观察到错误:|_ checkpoint ⇢ 处理程序 com.baeldung.spring .rsocket.client.MarketDataRestController#current(String) [DispatcherHandler] |_ checkpoint ⇢ HTTP GET "/current/pko" [ExceptionHandlingWebHandler] 堆栈跟踪:在 io.rsocket.exceptions.Exceptions.from(Exceptions.java:76) 在io.rsocket.core.RSocketRequester.handleFrame(RSocketRequester.java:706) 在 io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:640) 在 reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)在 reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) 在 reactor.core.publisher。FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:670)在 reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205) 在 reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112) 在 reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext (FluxMap.java:213) 在 reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) 在 reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:260) 在 reactor.netty.channel。 FluxReceive.onInboundNext(FluxReceive.java:366) 在 reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) 在 reactor.netty.channel.ChannelOperationsHandler。channelRead(ChannelOperationsHandler.java:96) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext .fireChannelRead(AbstractChannelHandlerContext.java:357) 在 io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) 在 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) 在 io。 netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) 在 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) 在 io.netty.channel.AbstractChannelHandlerContext。fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel .AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) 在 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) 在 io.netty.channel.nio.NioEventLoop.processSelectedKeys( NioEventLoop.java:576) 在 io.netty.channel.nio.NioEventLoop。在 io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) 在 io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在 io 运行(NioEventLoop.java:493) .netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.base/java.lang.Thread.run(Thread.java:834)

所以我从

@Configuration
public class ClientConfiguration {

    @Bean
    public RSocket rSocket() {
        return RSocketFactory.connect()
                             .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
                             .frameDecoder(PayloadDecoder.ZERO_COPY)
                             .transport(TcpClientTransport.create(7000))
                             .start()
                             .block();
    }

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
    }
}

@Configuration
public class ClientConfiguration {
    

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.builder()
                .rsocketStrategies(rSocketStrategies)
                .connectTcp("localhost", 7000)
                .block();
    }
}

这个小改动有帮助,不会发生异常。该问题的另一个问题是来自客户端(请求者)的请求由服务器(响应者)一一处理。我创建 SOAPUI REST 项目并在 2 个线程中运行 GET 请求。看起来服务器使用单线程。这不是我期望达到的。

为了方便起见,我将展示整个解决方案。

服务器 :

简单的控制器

@Controller
public class MarketDataRSocketController {

    Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);

    private final MarketDataRepository marketDataRepository;

    public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
        this.marketDataRepository = marketDataRepository;
    }

    @MessageMapping("currentMarketData")
    public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
        logger.info("Getting data for: "+marketDataRequest);
        Mono<MarketData> result = marketDataRepository.getOne(marketDataRequest.getStock());
        logger.info("Controller thread move forward: "+marketDataRequest);
        return result;
    }

    @MessageExceptionHandler
    public Mono<MarketData> handleException(Exception e) {
        return Mono.just(MarketData.fromException(e));
    }
}

在存储库中,我添加Thread.sleep(10000);只是为了模拟长时间运行的操作。

@Component
public class MarketDataRepository {
    Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);
    private static final int BOUND = 100;
    private Random random = new Random();

    public Mono<MarketData> getOne(String stock) {
        //return return Mono.just(getMarketDataResponse(stock)); original code from baeldung.
        return Mono.just(stock).map(s -> getMarketDataResponse(s));
    }

    private MarketData getMarketDataResponse(String stock) {
        logger.info("Repository thread go speel ZzzZZ");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
        logger.info("Repository thread move forward");
        return new MarketData(stock, random.nextInt(BOUND));
    }
}

客户

简单的客户端配置:

@Configuration
public class ClientConfiguration {


    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.builder()
                .rsocketStrategies(rSocketStrategies)
                .connectTcp("localhost", 7000)
                .block();
    }
}

以及我在 SOAP UI 中使用的简单 REST 控制器

@RestController
public class MarketDataRestController {
    Logger logger = LoggerFactory.getLogger(MarketDataRestController.class);

    private final Random random = new Random();
    private final RSocketRequester rSocketRequester;

    public MarketDataRestController(RSocketRequester rSocketRequester) {
        this.rSocketRequester = rSocketRequester;
    }

    @GetMapping(value = "/current/{stock}")
    public Publisher<MarketData> current(@PathVariable("stock") String stock) {
        logger.info("Get REST call for stock : "+stock);
        return rSocketRequester.route("currentMarketData")
                               .data(new MarketDataRequest(stock))
                               .retrieveMono(MarketData.class);
    }
}

当我运行服务器和客户端时,我会遇到难以理解的行为。通过 SOAP UI,我在 2 个线程中发出单个请求。

在客户端日志中,我得到:

2021-09-01 11:30:14,614 信息 [reactor-http-nio-2] com.baeldung.spring.rsocket.client.MarketDataRestController:获取 REST 调用股票:pko
2021-09-01 11:30:14,691 信息[reactor-http-nio-3] com.baeldung.spring.rsocket.client.MarketDataRestController:获取股票的 REST 调用:pko

在服务器中,我得到如下日志:

从第一个镜头记录:

//从客户端获取数据
2021-09-01 11:30:14,843 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: 获取数据: MarketDataRequest(stock=pko)
//记录在调用存储库
2021-09-01 11:30:14,844 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController:控制器线程向前移动:MarketDataRequest(stock= pko)
//记录存储库睡眠线程
2021-09-01 11:30:14,862 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: 存储库线程 go speel ZzzZZ
//存储库完成工作
2021-09-01 11:30:24,863 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository:存储库线程向前移动

服务器仅处理单个调用,并在存储库完成作业时等待。然后以类似的方式处理下一个请求:

2021-09-01 11:30:24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController:获取数据:MarketDataRequest(stock=pko)
2021-09-01 11:30 :24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController:控制器线程向前移动:MarketDataRequest(stock=pko)
2021-09-01 11:30:24,874 INFO [reactor-http -nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository:存储库线程 go speel ZzzZZ
2021-09-01 11:30:34,876 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket。 server.MarketDataRepository:存储库线程前进

我不明白为什么服务器要一一处理调用。也许代码中存在一些问题,或者我不理解正确的东西。先感谢您。

4

1 回答 1

2

在 Reactor 中,默认情况下,一切都在主线程上运行。调用 Thread.sleep主线程被阻塞并且应用程序冻结。如果您想模拟长时间运行的操作,可以使用delayElements运算符:

.delayElements(Duration.ofSeconds(10));

注意:Reactor BlockHound检测并报告此类阻塞调用。

于 2021-09-01T16:25:50.467 回答