问题标签 [spring-reactor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
124 浏览

cassandra - 对于调用存储库方法时的spring数据反应cassandra是在返回通量之前将查询的整个结果加载到内存

我是响应式编程的新手。使用反应弹簧数据在cassandra / datastax上进行 POC 。我已经使用弹簧数据ReactiveCassandraRepository执行了一些选择查询。我的怀疑是关于内部工作。例如,当我触发 findAll 查询时,它会返回一个 Flux 并且我可以使用我的 REST 客户端代码查看数据。但我的疑问是,如果我有 100 条记录作为查询的结果,spring 数据是否会将这 100 条记录提取到内存中,然后将其作为 Flux 流式传输,或者实际上是在客户端获取每一行时,数据一直在流式传输从数据库。

0 投票
2 回答
769 浏览

java - 为什么在 Spring Cloud Stream 反应式使用者中遇到异常时我会收到 onComplete 信号?

我将 Spring Reactor 与 Spring Cloud Stream (GCP Pub/Sub Binder) 一起使用并遇到错误处理问题。我可以通过一个非常简单的示例重现该问题:

我期望的行为是看到“无法使用消息”打印,但是,这似乎不是发生的事情。在向我看到/信号的链添加.log()调用时,我希望看到信号。onNextonCompleteonError

我的实际代码如下所示:

我注意到在我的服务类的深处,我试图对我的 Reactor 发布者进行错误处理。但是,onError使用 Spring Cloud Stream 时不会出现该信号。如果我只是myService.processMessage(msg)在单元测试中调用我的服务并模拟异常,我的反应链将正确传播错误信号。

当我连接到 Spring Cloud Stream 时,这似乎是一个问题。我想知道 Spring Cloud Function/Stream 是否正在执行任何全局错误包装?

在我的非平凡代码中,我确实注意到此错误消息可能与为什么我没有收到错误信号有关?

onError为了进一步混淆,如果我将 Spring Cloud Stream 绑定切换到非反应式实现,我能够在反应链中获得信号,如下所示:

0 投票
1 回答
318 浏览

java - Project Reactor - 长期运行的反应式 Kafka 消费者

有没有人有一个长期运行的反应式 Kafka 消费者的例子?https://github.com/reactor/reactor-kafka.git中的 SampleConsumer 在订阅后退出,这是预期的。但是我需要不断地接收来自我们的主题的消息,并且在发生故障时我应该重新连接(特别是连接问题)。

我确实通过https://github.com/CollaborationInEncapsulation/s1p-reactor-netty-kafka-twitter.git作为帮助提出示例但不是很成功。感谢帮助

0 投票
1 回答
26 浏览

spring-cloud-stream - 在没有任何消息代理的情况下,为 API 集成(直接使用 WebClient)提供反应式微服务是否有任何风险?

我正在实现微服务 API,但不打算拥有任何消息代理。API 服务将与 WebClient/web Flux 相互通信。由于技能/预算挑战,不使用任何消息代理。这样生产有风险吗?故障转移/重放等缺点是什么?

0 投票
1 回答
239 浏览

mono - 如何将我的 ReactiveSecurityContextHolder 角色传递给 mono webclient 休息调用?

我有带有自定义过滤器的 Spring Cloud Gateway。在这个过滤器中,我尝试使用参数调用我的 webclient。来自 ReactiveSecurityContextHolder 的角色。getAccessToken 结果是

com.xy.HeaderGatewayFilter$apply$1$1.apply(HeaderGatewayFilter.kt:37)。

如果我只返回 "getAccessToken(webClient, "fixString" ).flatMap(chain::filter)" 而没有实际角色,它的工作并返回一个 accessToken。

0 投票
0 回答
94 浏览

spring - 在命令式弹簧应用程序中处理弹簧反应器异常

我在命令式 Spring Boot 应用程序中使用 webflux。在这个应用程序中,我需要使用 webclient 对各种后端进行休息调用,并在继续下一步之前等待所有响应。

A类

B类

上面的代码按预期工作。Webclient 配置为在 5xx 和 4xx 上抛出错误,我可以使用 WebclientResponseException 捕获这些错误。

问题是我无法从反应框架中捕获任何异常。例如,我的 Web 客户端配置为使用指数退避重试并在耗尽时抛出异常,我无法在上面的 try catch 块中捕获它。我探索了使用 onErrorReturn 在 webclient 流中处理该异常的选项,但它不会将错误传播回我的订阅者。

我也无法将异常添加到 catch 块,因为它永远不会被代码的任何部分抛出。

任何人都可以建议处理这些类型的错误的最佳方法是什么。情景。我是 webflux 和响应式编程的新手。

0 投票
0 回答
438 浏览

spring - 从 spring-cloud-gateway 中的传入请求中捕获对象属性值

我有spring-cloud-gateway如下配置:

该路由预计将接受两种类型的请求:

第一个请求携带一个SavingsAccount对象,第二个请求携带一个StudentAccount对象。这两个对象都有一个userRef:String我需要提取的属性。是否有可能通过添加它以某种方式做到这cloud-gateway-configuration一点?

基本上,我试图找出如何从上面的 spring-cloud-gateway 配置中的传入请求携带的对象中提取属性。

这是库方法,我认为可以用来实现:

但我想用它来提取读取对象的属性之一的值,然后使用该值来调用将返回主机名的服务 - 最终必须从uri()方法中使用的值

这个方法org.springframework.cloud.gateway.route.builder看起来也很有用

0 投票
0 回答
81 浏览

java - 使用 Spring-Reactor 的 changeRequestUri() 的正确方法是什么?

有人可以提供一个例子并解释如何使用这种方法吗?我是 Reactor 项目的新手,我很难理解使用它的方式,而且那里似乎没有足够的教程。我想要的是调用一个服务,它将以字符串格式返回一个主机名,然后我想将其作为 uri 用户

这是方法的包和方法: