我和 Spring boot 一起玩 RSocket。我想做一个简单的请求-响应示例。例如,我从此链接获取代码:
https://www.baeldung.com/spring-boot-rsocket#request-response
源代码:
当我在没有更改的情况下运行示例代码时,在异常请求期间出现错误。这个错误不是这个问题的重点,但我只想通过 baeldung 显示对原始源的更改。
[reactor-tcp-nio-1] org.springframework.core.log.CompositeLog: [5927a44d-9] HTTP GET "/current/pko" io.rsocket.exceptions.ApplicationErrorException 的 500 服务器错误:没有目标处理程序' ' 在 io.rsocket.exceptions.Exceptions.from(Exceptions.java:76) 抑制:reactor.core.publisher.FluxOnAssembly$OnAssemblyException:在以下站点观察到错误:|_ checkpoint ⇢ 处理程序 com.baeldung.spring .rsocket.client.MarketDataRestController#current(String) [DispatcherHandler] |_ checkpoint ⇢ HTTP GET "/current/pko" [ExceptionHandlingWebHandler] 堆栈跟踪:在 io.rsocket.exceptions.Exceptions.from(Exceptions.java:76) 在io.rsocket.core.RSocketRequester.handleFrame(RSocketRequester.java:706) 在 io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:640) 在 reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)在 reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) 在 reactor.core.publisher。FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:670)在 reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205) 在 reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112) 在 reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext (FluxMap.java:213) 在 reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) 在 reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:260) 在 reactor.netty.channel。 FluxReceive.onInboundNext(FluxReceive.java:366) 在 reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) 在 reactor.netty.channel.ChannelOperationsHandler。channelRead(ChannelOperationsHandler.java:96) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext .fireChannelRead(AbstractChannelHandlerContext.java:357) 在 io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) 在 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) 在 io。 netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) 在 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) 在 io.netty.channel.AbstractChannelHandlerContext。fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel .AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) 在 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) 在 io.netty.channel.nio.NioEventLoop.processSelectedKeys( NioEventLoop.java:576) 在 io.netty.channel.nio.NioEventLoop。在 io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) 在 io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在 io 运行(NioEventLoop.java:493) .netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.base/java.lang.Thread.run(Thread.java:834)
所以我从
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() {
return RSocketFactory.connect()
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}
至
@Configuration
public class ClientConfiguration {
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.builder()
.rsocketStrategies(rSocketStrategies)
.connectTcp("localhost", 7000)
.block();
}
}
这个小改动有帮助,不会发生异常。该问题的另一个问题是来自客户端(请求者)的请求由服务器(响应者)一一处理。我创建 SOAPUI REST 项目并在 2 个线程中运行 GET 请求。看起来服务器使用单线程。这不是我期望达到的。
为了方便起见,我将展示整个解决方案。
服务器 :
简单的控制器
@Controller
public class MarketDataRSocketController {
Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);
private final MarketDataRepository marketDataRepository;
public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
this.marketDataRepository = marketDataRepository;
}
@MessageMapping("currentMarketData")
public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
logger.info("Getting data for: "+marketDataRequest);
Mono<MarketData> result = marketDataRepository.getOne(marketDataRequest.getStock());
logger.info("Controller thread move forward: "+marketDataRequest);
return result;
}
@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
return Mono.just(MarketData.fromException(e));
}
}
在存储库中,我添加Thread.sleep(10000);
只是为了模拟长时间运行的操作。
@Component
public class MarketDataRepository {
Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);
private static final int BOUND = 100;
private Random random = new Random();
public Mono<MarketData> getOne(String stock) {
//return return Mono.just(getMarketDataResponse(stock)); original code from baeldung.
return Mono.just(stock).map(s -> getMarketDataResponse(s));
}
private MarketData getMarketDataResponse(String stock) {
logger.info("Repository thread go speel ZzzZZ");
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
logger.info("Repository thread move forward");
return new MarketData(stock, random.nextInt(BOUND));
}
}
客户
简单的客户端配置:
@Configuration
public class ClientConfiguration {
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.builder()
.rsocketStrategies(rSocketStrategies)
.connectTcp("localhost", 7000)
.block();
}
}
以及我在 SOAP UI 中使用的简单 REST 控制器
@RestController
public class MarketDataRestController {
Logger logger = LoggerFactory.getLogger(MarketDataRestController.class);
private final Random random = new Random();
private final RSocketRequester rSocketRequester;
public MarketDataRestController(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@GetMapping(value = "/current/{stock}")
public Publisher<MarketData> current(@PathVariable("stock") String stock) {
logger.info("Get REST call for stock : "+stock);
return rSocketRequester.route("currentMarketData")
.data(new MarketDataRequest(stock))
.retrieveMono(MarketData.class);
}
}
当我运行服务器和客户端时,我会遇到难以理解的行为。通过 SOAP UI,我在 2 个线程中发出单个请求。
在客户端日志中,我得到:
2021-09-01 11:30:14,614 信息 [reactor-http-nio-2] com.baeldung.spring.rsocket.client.MarketDataRestController:获取 REST 调用股票:pko
2021-09-01 11:30:14,691 信息[reactor-http-nio-3] com.baeldung.spring.rsocket.client.MarketDataRestController:获取股票的 REST 调用:pko
在服务器中,我得到如下日志:
从第一个镜头记录:
//从客户端获取数据
2021-09-01 11:30:14,843 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: 获取数据: MarketDataRequest(stock=pko)
//记录在调用存储库
2021-09-01 11:30:14,844 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController:控制器线程向前移动:MarketDataRequest(stock= pko)
//记录存储库睡眠线程
2021-09-01 11:30:14,862 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: 存储库线程 go speel ZzzZZ
//存储库完成工作
2021-09-01 11:30:24,863 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository:存储库线程向前移动
服务器仅处理单个调用,并在存储库完成作业时等待。然后以类似的方式处理下一个请求:
2021-09-01 11:30:24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController:获取数据:MarketDataRequest(stock=pko)
2021-09-01 11:30 :24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController:控制器线程向前移动:MarketDataRequest(stock=pko)
2021-09-01 11:30:24,874 INFO [reactor-http -nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository:存储库线程 go speel ZzzZZ
2021-09-01 11:30:34,876 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket。 server.MarketDataRepository:存储库线程前进
我不明白为什么服务器要一一处理调用。也许代码中存在一些问题,或者我不理解正确的东西。先感谢您。