问题标签 [rsocket-java]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
80 浏览

java - 来自一个客户端的并行请求在 RSocket 中串行处理

我希望服务器的所有调用都将并行处理,但事实并非如此。这是一个简单的例子。

RSocket 版本:1.1.0

服务器

客户

在客户端日志中,我们可以看到两个请求同时发送,并且在 10 秒后同时收到两个响应(每个请求在 5 秒内进行)。

在服务器日志中,我们可以看到请求是按顺序执行的,而不是并行执行的。

你能帮我理解这种行为吗?

  1. 为什么我们在 10 秒后收到第一个响应,而不是 5 秒?
  2. 如果我希望并行处理所有请求,如何正确创建服务器?

如果我替换Mono.fromCallableMono.fromFuture(CompletableFuture.supplyAsync(() -> myBusinessLogic(), executorService)),那么它将解决 1。

如果我替换Mono.fromCallableMono.delay(Duration.ZERO).map(ignore -> myBusinessLogic(),那么它将解决 1. 和 2。

如果我替换Mono.fromCallableMono.create(sink -> sink.success(myBusinessLogic())),那么它不会解决我的问题。

客户端日志:

服务器日志:

0 投票
0 回答
99 浏览

angular - rsocket 无法在我的浏览器中运行单独的选项卡

我有一个使用 Angular 的演示应用程序前端。

我正在使用 Rsocket 并且可以通过我的服务器端(spring-boot)为社区恢复。

当我在浏览器中打开一个标签时它运行得很好,但是当我打开一个新标签而当前标签仍然保留时,我遇到了问题。

当前标签

新标签

打开新标签后的当前标签

新选项卡中的 rsocket'state 已连接,而前一个选项卡中的 rsocket'state 已关闭。

=> 如何让我的 Rsocket 在浏览器中运行单独的选项卡?

0 投票
1 回答
80 浏览

java - NoClassDefFoundError - 类路径中缺少依赖项

我的问题

我写了一些java代码。代码在 intellij 中完美运行。
但是当我将它作为 .jar 文件(即 command java -jar app.jar)运行时,我收到错误消息:

我的研究

我一直在 google 和 stackoverflow 上搜索 NoClassDefFoundError 并发现当依赖项丢失时会出现错误。解决方案似乎是我需要将依赖项添加到类路径或 maven 存储库。但我不知道该怎么做(我的 pom.xml 如下所示)。

我的问题

我要做的就是把我的程序变成一个 .jar 文件并通过我的终端(Windows10 操作系统)运行它。我很感激我能得到的所有帮助。

pom.xml

0 投票
1 回答
213 浏览

android - RSocket Android + Spring Boot后端路由错误:没有目的地''的处理程序

我得到ApplicationErrorException: No handler for destination ''试图使用 RSocket 从 android 代码连接到我的 Web 服务器(弹簧启动)。作为一种传输方式,我使用 websockets。

在服务器端我使用:

在客户端我同时使用了:

Ktor 和 Netty 都给了我同样的错误。错误日志如下:

安卓:

这是后端使用的控制器代码:

我用来从 Android 连接的代码'io.rsocket:rsocket-transport-netty:1.1.1'如下:

用于连接Ktor的代码(如此处所述如下:

正如我上面提到的,这两种方法都会导致相同的结果:No handler for destination ''

值得一提的是,当我从另一个 Spring Boot 客户端使用相同的路由时,这个问题就不存在了。

有谁知道我做错了什么?如果有人指出我的错误,我会很高兴。提前致谢。

我在 github 上创建了示例项目来帮助重现此错误:rsocket-android-spring

重现步骤:

  1. 克隆或下载 github 项目rsocket-android-spring
  2. 运行 Spring Boot 服务器
  3. 编辑提供 PC 正确 IP 地址的 hostUrl 变量(!)
  4. 运行 Android 应用程序并单击“发送”按钮

如果您希望在 Android 上从 Netty 切换到 Ktor,您可以使用 MainActivity 代码中的注释方法,但不要忘记使用所需的依赖项build.gradle(存在于那里)。

0 投票
1 回答
149 浏览

spring - RSocket 和 Spring 不处理多个请求

我和 Spring boot 一起玩 RSocket。我想做一个简单的请求-响应示例。例如,我从此链接获取代码:

https://www.baeldung.com/spring-boot-rsocket#request-response

源代码:

https://github.com/eugenp/tutorials/tree/master/spring-5-webflux/src/main/java/com/baeldung/spring/rsocket

当我在没有更改的情况下运行示例代码时,在异常请求期间出现错误。这个错误不是这个问题的重点,但我只想通过 baeldung 显示对原始源的更改。

[reactor-tcp-nio-1] org.springframework.core.log.CompositeLog: [5927a44d-9] HTTP GET "/current/pko" io.rsocket.exceptions.ApplicationErrorException 的 500 服务器错误:没有目标处理程序' ' 在 io.rsocket.exceptions.Exceptions.from(Exceptions.java:76) 抑制:reactor.core.publisher.FluxOnAssembly$OnAssemblyException:在以下站点观察到错误:|_ checkpoint ⇢ 处理程序 com.baeldung.spring .rsocket.client.MarketDataRestController#current(String) [DispatcherHandler] |_ checkpoint ⇢ HTTP GET "/current/pko" [ExceptionHandlingWebHandler] 堆栈跟踪:在 io.rsocket.exceptions.Exceptions.from(Exceptions.java:76) 在io.rsocket.core.RSocketRequester.handleFrame(RSocketRequester.java:706) 在 io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:640) 在 reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)在 reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) 在 reactor.core.publisher。FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:670)在 reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205) 在 reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112) 在 reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext (FluxMap.java:213) 在 reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) 在 reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:260) 在 reactor.netty.channel。 FluxReceive.onInboundNext(FluxReceive.java:366) 在 reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) 在 reactor.netty.channel.ChannelOperationsHandler。channelRead(ChannelOperationsHandler.java:96) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext .fireChannelRead(AbstractChannelHandlerContext.java:357) 在 io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) 在 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) 在 io。 netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) 在 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) 在 io.netty.channel.AbstractChannelHandlerContext。fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel .AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) 在 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) 在 io.netty.channel.nio.NioEventLoop.processSelectedKeys( NioEventLoop.java:576) 在 io.netty.channel.nio.NioEventLoop。在 io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) 在 io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在 io 运行(NioEventLoop.java:493) .netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.base/java.lang.Thread.run(Thread.java:834)

所以我从

这个小改动有帮助,不会发生异常。该问题的另一个问题是来自客户端(请求者)的请求由服务器(响应者)一一处理。我创建 SOAPUI REST 项目并在 2 个线程中运行 GET 请求。看起来服务器使用单线程。这不是我期望达到的。

为了方便起见,我将展示整个解决方案。

服务器 :

简单的控制器

在存储库中,我添加Thread.sleep(10000);只是为了模拟长时间运行的操作。

客户

简单的客户端配置:

以及我在 SOAP UI 中使用的简单 REST 控制器

当我运行服务器和客户端时,我会遇到难以理解的行为。通过 SOAP UI,我在 2 个线程中发出单个请求。

在客户端日志中,我得到:

2021-09-01 11:30:14,614 信息 [reactor-http-nio-2] com.baeldung.spring.rsocket.client.MarketDataRestController:获取 REST 调用股票:pko
2021-09-01 11:30:14,691 信息[reactor-http-nio-3] com.baeldung.spring.rsocket.client.MarketDataRestController:获取股票的 REST 调用:pko

在服务器中,我得到如下日志:

从第一个镜头记录:

//从客户端获取数据
2021-09-01 11:30:14,843 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: 获取数据: MarketDataRequest(stock=pko)
//记录在调用存储库
2021-09-01 11:30:14,844 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController:控制器线程向前移动:MarketDataRequest(stock= pko)
//记录存储库睡眠线程
2021-09-01 11:30:14,862 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: 存储库线程 go speel ZzzZZ
//存储库完成工作
2021-09-01 11:30:24,863 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository:存储库线程向前移动

服务器仅处理单个调用,并在存储库完成作业时等待。然后以类似的方式处理下一个请求:

2021-09-01 11:30:24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController:获取数据:MarketDataRequest(stock=pko)
2021-09-01 11:30 :24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController:控制器线程向前移动:MarketDataRequest(stock=pko)
2021-09-01 11:30:24,874 INFO [reactor-http -nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository:存储库线程 go speel ZzzZZ
2021-09-01 11:30:34,876 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket。 server.MarketDataRepository:存储库线程前进

我不明白为什么服务器要一一处理调用。也许代码中存在一些问题,或者我不理解正确的东西。先感谢您。

0 投票
0 回答
26 浏览

serversocket - RSocket 服务器一般可以有多少个连接?

我知道这是一个愚蠢的问题(因为它确实取决于服务器硬件),但是 RSocket 服务器(Java Spring Boot)通常可以有多少个连接?我问这个问题是因为我对 rsocket 负载均衡器很好奇。rsocket 负载均衡器似乎非常好,这让我想知道负载均衡器是否可以增加您通常能够与其他类型的套接字建立的连接数?

0 投票
0 回答
16 浏览

callback - 客户端可以在 RSocket 中提供对服务器消息的响应吗?

我正在探索 RSocket,我想知道客户端是否有办法同步响应来自服务器的流式消息......基本上是确认或拒绝消息。

见图表说明: 在此处输入图像描述

我正在使用请求流方法,这意味着客户端发送一条初始消息并期望从服务器返回多条消息。

我可以看到如何使用请求通道来异步响应从服务器发送的消息,但我希望找到更同步的东西,以便服务器可以根据上下文处理响应。

0 投票
1 回答
45 浏览

reactive-programming - RSocket 是第五代响应式框架吗?

根据 David Karnok 的分类,第 5 代反应式框架描述如下

Reactive-Streams 需要扩展以支持双向序列(或通道)形式的反应式 IO 操作。

David Karnok 的算子融合

RSocket 定义如下

RSocket 是一种在异步二进制边界上提供 Reactive Streams 语义的应用程序协议。它通过通过单个连接传递的异步消息启用以下对称交互模型: 请求/响应(1 个流) 请求/流(多个有限/无限流) 即发即弃(无响应) 通道(双向流)

那么 RSocket 是第五代响应式框架吗?