问题标签 [rsocket]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
10087 浏览

websocket - Websockets 与反应式套接字

我最近遇到了一个术语“反应式套接字”。到目前为止,我一直认为 websockets 是完全成熟的异步风格的方式。

那么什么是反应式套接字。

这个链接 ( http://rsocket.io/ ) 甚至谈到了 websockets 的比较。

0 投票
2 回答
1661 浏览

java - 如何使用 RSocket 创建文件发送客户端/服务器?

我似乎在RSocket上找不到任何资源/教程,除了在 GitHub 上阅读他们的代码,我不明白。

我的服务器上有一个文件的路径:String serverFilePath;

我希望能够从我的客户端下载它(最好使用RSocket 的 Aeron 实现)。有谁知道如何使用 RSocket 做到这一点?

提前致谢。

0 投票
0 回答
526 浏览

ubuntu-16.04 - 错误:构建 yarpl 时,目标“yarpl-tests”的依赖目标“gmock”不存在 - Ubuntu 16.04

我正在尝试构建在 github 中使用 rsocket 库获得的 YARPL 库。我按照自述文件中提到的说明进行操作。

但是在第三步中cmake ../ -DCMAKE_BUILD_TYPE=DEBUG,我得到了以下输出。

现在,我已经安装了 gmock,但它仍然给我同样的错误。我该怎么办?

PS我正在使用Ubuntu 16.04

0 投票
3 回答
1785 浏览

spring - 如何设计具有外部阻塞 API 调用的响应式微服务?

我有一些微服务,它们应该在 WebFlux 框架之上工作。每个服务器都有自己的带有 Mono 或 Flux 的 API。我们使用的是 Spring 支持的 MongoDB(Spring Data MongoDb Reactive)。

问题是外部阻塞 API,我必须在我的系统中使用它。

我有一个解决方案。我可以将阻塞 API 调用包装在专用线程池中,并将其与 CompletableFuture 一起使用。

还有什么可以解决我的问题吗?我认为,全新的 Rsocket 无法解决我的问题。

0 投票
1 回答
452 浏览

security - 安全使用 rsocket 的最佳实践是什么?

安全使用 rsocket 的最佳实践是什么(身份验证、授权)。这是 rsocket 固执己见的事情,还是其他人在 rsocket 之上构建的工作?

0 投票
1 回答
544 浏览

java - RSocket 适用于生成的数据,但不适用于 Spring Reactive MongoDB

分辨率总结:

在目前的大多数 RSocket 示例中,即使在 SpringBoot 相关教程中,服务器端接受器也只是简单地构造为一个新对象(如下面的 new MqttMessageService())。如果您在接受器类中生成示例内容,这很好,但当接受器依赖容器中的其他 bean 时,可能会导致以下依赖注入相关的混淆。

原始问题:

尝试通过 Rsocket 的 Java 服务器使用 Spring Data Reactive Mongodb 存储库流式传输数据库条目时出现 NullPointerException。

问题在于,在调试期间,所有组件都单独工作:我可以通过同一个 Mongodb 存储库获取请求的数据,我还可以使用 Rsocket 在同一服务器和客户端之间流式传输随机生成的数据。

所以我要么错过了一些非常基本的东西,要么一起使用 Reactive Mongodb 和 Rsocket 可能会出现问题。

这是原始的服务器端 Rsocket 配置

这是具有正确 DI的工作服务器端 Rsocket 配置:

这是服务器端 AbstractRSocket 实现,其中在返回 service.findAll() 时抛出 NullPointerException。

这是反应式存储库和相关服务。该服务在注入到服务器的 AbstractRSocket 实现时返回 null,但在注入其他类时工作正常:

这是与测试内容完美配合的客户端代码:

我在这里可能有点超出我的知识水平,并且关于该主题的资源非常有限,所以我很感激任何关于解决方案的提示:)

0 投票
3 回答
813 浏览

java - 有状态的 Rsocket 应用程序

在我的项目中,我想让多个客户端连接到服务。我正在使用 java Rsocket 实现。

服务应该为每个客户端维护一个状态。现在,我可以通过一些标识符来管理客户端。这个选项我已经实现了。但我不想使用字符串手动管理会话。

所以另一个想法是通过 Rsocket 连接来识别客户端。有没有办法使用 Rsocket 通道来识别特定的客户端?

想象一个示例服务和几个客户端。每个客户端都有 Rsocket 通道,服务启动并运行。有没有办法使用 Rsocket 通道在服务器端识别这些客户端?如果您可以展示这种行为的编程示例,那将是惊人的。谢谢!

编辑(更详细地描述案例)

这是我的例子。

我们目前使用了三个 CORBA 对象,如图所示:

  • LoginObject(通过 NamingService 检索到的引用)。客户端可以调用 login() 方法来获取会话
  • Session 对象有多种方法用于查询当前服务上下文的详细信息,最重要的是获取 Transaction 对象
  • Transaction 对象可用于通过将 commandName 和键值对列表作为参数的通用方法执行各种命令。在客户端执行 n 个命令后,他可以提交或回滚事务(也可以通过 Transaction 对象上的方法)。

在此处输入图像描述

所以这里我们使用会话对象在我们的服务上执行事务。

现在我们决定从 CORBA 迁移到 Rsocket。因此我们需要 Rsocket 微服务能够存储会话的状态,否则我们无法知道将要提交或回滚什么。这可以通过每个客户的单个发布者来完成吗?

0 投票
0 回答
177 浏览

java - Java Rsocket 流式客户端

这是有状态 Rsocket 应用程序线程的延续。这是我的例子。

我们目前使用了三个 CORBA 对象,如图所示:

  • LoginObject(通过 NamingService 检索到的引用)。客户端可以调用 login() 方法来获取会话
  • Session 对象有多种方法用于查询当前服务上下文的详细信息,最重要的是获取 Transaction 对象
  • Transaction 对象可用于通过将 commandName 和键值对列表作为参数的通用方法执行各种命令。在客户端执行 n 个命令后,他可以提交或回滚事务(也可以通过 Transaction 对象上的方法)。

在此处输入图像描述

所以这里我们使用会话对象在我们的服务上执行事务。为了用 rsocket 替换它,我们编写了一个简单的 protobuf:

这个想法是,一旦用户登录,他将开始将命令流式传输到服务器。最后,客户端将提交或删除更改。

所以这里是开始向服务器发出值的客户端代码:

如您所见,客户端将在 runClient 方法中登录,如果登录成功,客户端将执行 runCommands 方法,该方法只会发出一些值。

在我的服务器上,根据 protobuf,我创建了事务方法:

现在,当我运行它时,由于客户端上的阻塞错误,它无法工作:

但是我的问题不是它不起作用,而是我希望每个客户端都与服务器进行会话,并且对于每个客户端,服务器应该保持事务状态。这是完整代码的 git 链接:https ://github.com/oe19fyfa/rsocket-clientstream-example

所以在写完所有这些之后,我的问题是:建立事务会话的正确方法是什么?我知道我的代码非常业余,所以我愿意接受任何类型的建议?

0 投票
2 回答
2934 浏览

java - Rsocket服务器异常:没有目标''的处理程序(目标没有从客户端传递到服务器)

我为 RSocket 消息写了一个小演示

问题是我无法访问Rsocket端点,我从服务器收到以下异常:

客户端: 配置:

控制器:

服务器端(使用spring Rsocket): yml:

配置:

我很确定它与新wrap函数有关,RSocketRequester.wrap 因为它接受一个新参数metadataMimeType,我将它设置为 application/Json,但它似乎不起作用

堆栈跟踪:

org.springframework.messaging.MessageDeliveryException:在 org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler.handleNoMatch(RSocketMessageHandler.java:312) 在 org.springframework.messaging.handler.invocation.reactive 中没有目标处理程序。 AbstractMethodMessageHandler.getHandlerMethod(AbstractMethodMessageHandler.java:445) at org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:417) at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.lambda$ handleAndReply$4(MessagingRSocket.java:173) 在 reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44) 在 reactor.core.publisher.Mono.subscribe(Mono.java:3920) 在 reactor.core.publisher。FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207) 在 reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) 在 reactor.core.publisher.MonoFromFluxOperator.subscribe(MonoFromFluxOperator.java:74) 在 io.rsocket .RSocketResponder.handleRequestResponse(RSocketResponder.java:386) 在 reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) 在 reactor.core.publisher.RSocketResponder.handleFrame(RSocketResponder.java:298)。 MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:238) 在 reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) 在 reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)在 reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux。在 reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:184) 在 reactor.core.publisher 订阅(FluxGroupBy.java:696).Flux.subscribe(Flux.java:8000) 在 reactor.core.publisher .Operators$MonoSubscriber.complete(Operators.java:1582) 在 reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:316) 在 io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116)在 reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) 在 reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) 在 reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy .java:316) 在 reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) 在 reactor.core.publisher.FluxMap$MapSubscriber。onNext(FluxMap.java:114) 在 reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) 在 reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:206) 在 reactor.netty.channel .FluxReceive.onInboundNext(FluxReceive.java:322) 在 reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:342) 在 reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91) 在 io.netty。 channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty .handler.codec.ByteToMessageDecoder。fireChannelRead(ByteToMessageDecoder.java:328) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel .AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) 在 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) 在 io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421) 在 io。 netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) 在 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) 在 io.netty.channel。DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)在 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) 在 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) 在 io.netty.channel.nio.NioEventLoop.run (NioEventLoop.java:511) 在 io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) 在 io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在 io。 netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.base/java.lang.Thread.run(Thread.java:834)fireChannelRead(DefaultChannelPipeline.java:930) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) at io .netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop .java:511) 在 io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在 io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) 在 io.netty。 util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.base/java.lang.Thread.run(Thread.java:834)fireChannelRead(DefaultChannelPipeline.java:930) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) at io .netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop .java:511) 在 io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在 io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) 在 io.netty。 util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.base/java.lang.Thread.run(Thread.java:834)930) 在 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) 在 io.netty.channel.nio 的 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) .NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) 在 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) 在 io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) 在io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) 在 io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在 io.netty.util.concurrent.FastThreadLocalRunnable。在 java.base/java.lang.Thread.run(Thread.java:834) 处运行(FastThreadLocalRunnable.java:30)930) 在 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) 在 io.netty.channel.nio 的 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) .NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) 在 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) 在 io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) 在io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) 在 io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在 io.netty.util.concurrent.FastThreadLocalRunnable。在 java.base/java.lang.Thread.run(Thread.java:834) 处运行(FastThreadLocalRunnable.java:30)在 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) 在 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) 在 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) 读取(AbstractNioByteChannel.java:163) .channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor. java:918) 在 io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在 io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.base/java。 lang.Thread.run(Thread.java:834)在 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) 在 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) 在 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) 读取(AbstractNioByteChannel.java:163) .channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor. java:918) 在 io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在 io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.base/java。 lang.Thread.run(Thread.java:834)processSelectedKeysOptimized(NioEventLoop.java:632) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) at io.netty .util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable .java:30) 在 java.base/java.lang.Thread.run(Thread.java:834)processSelectedKeysOptimized(NioEventLoop.java:632) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) at io.netty .util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable .java:30) 在 java.base/java.lang.Thread.run(Thread.java:834)netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread .java:834)netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread .java:834)

0 投票
1 回答
781 浏览

java - Spring Boot中如何映射RSocket的所有交互模型

RSocket 中提供了 4 种交互模型。

  • 开火即忘
  • 请求和响应
  • 请求流
  • 请求通道
  • (元数据推送)

Spring(和 Spring Boot)提供 RSocket 集成,使用现有的消息传递基础架构可以轻松构建 RSocket 服务器以隐藏原始 RSocket API。

而在客户端,RescoketRequester提供了一个与服务器握手的功能。

但是怎么用requestChannel 以 Spring 方式的metadataPush模型(使用消息传递基础架构)?

示例代码在Github上。更新:添加requestChannel示例。

更新SETUPMETADATA_PUSH可以由@ConnectMapping. Spring Security RSocket 可以保护SETUPREQUEST