问题标签 [spring-reactive]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
1050 浏览

rx-java2 - rxjava2-jdbc:对 toIterable() 的迭代是阻塞的,线程 reactor-http-nio-2 不支持此功能

我正在做一个反应/通量/单休息服务。后端是 Oracle,使用 rxjava2-jdbc。

如何克服这个阻塞错误?

我正在通过示例学习 rx,因此很高兴了解防止感觉常规使用的列表操作的概念细节。

  1. Repository 从 rx/database 返回一个 Flux:

回购返回通量<>

  1. 处理程序尝试将该列表/通量添加到另一个 Protobuf 对象 SearchResponse,但失败。

AppHander 尝试将客户列表添加到 SearchResponse protobuf 对象

短栈恍惚:

转换 List<Proto.SearchResponse> 导致错误

完整的堆栈跟踪:

存储库.java

AppHandler.java

0 投票
0 回答
318 浏览

spring - Spring(Boot)中文件上传的响应式流式处理方法

我们在 inet 和 stackoverflow 上花费了很多时间,但是没有一个发现让我们在计划在 Spring 上下文中上传文件的方式上感到满意。

对我们的架构说几句话。我们有一个 node.js 客户端,它将文件上传到 Spring Boot 应用程序中。让我们将此 REST 端点称为“客户端端点”。我们的 Spring Boot 应用程序充当中间件并调用“外部系统”的端点,因此我们将此端点称为“外部”端点,因为区别。主要目的是这两个端点之间的文件处理以及它们之间的一些业务逻辑。

实际上,我们客户端的界面是这样的:

在这里我们非常灵活,因为它是我们的客户端和我们的接口定义。

由于在负载下我们的系统有时会耗尽内存的问题,我们计划将我们的代码重新组织为一种更加基于流的反应式方法。当我写“负载不足”时,我的意思是负载严重不足,例如同时上传数百个文件,其中大文件从至少几 MB 到最多 1GB。我们知道,这些测试并不代表真实的应用程序用例,但我们希望做好准备。

我们对我们的挑战进行了一些研究,最终我们使用分析器工具向我们展示了根据我们的 REST 端点,我们将文件作为字节数组完全存储在我们的内存中。仅此而已,但效率不高。

目前我们正面临这样的要求,即为文件上传提供一个 REST 端点并将这些文件推送到某个外部系统的另一个 REST 端点。这样做,我们的主要应用程序意图是成为文件上传的一些中间层。根据这种初始情况,我们希望不会将这些文件作为一个整体留在我们的记忆中。最好是一个流,也许是反应性的。我们已经对一些业务功能有部分反应,但在熟悉所有这些东西的一开始。

那么,到目前为止,我们的步骤是什么?我们引入了一个新的 Client (node.js --> Spring Boot) 接口,如下所示。到目前为止,这有效。但它真的是一种基于流的方法吗?第一个指标表明,这不会降低内存利用率。

第一个问题:这种类型 Mono<> 就在这里吗?还是我们应该更好地使用 DataBuffer 的 Flux 之类的?而且,如果是这样,客户端应该如何表现并以这种格式传递数据,它实际上是一种流式方法?

然后 FileService 类应该将此文件发布到外部系统中,也许对给定数据执行其他操作,至少记录 id 和文件名。:-) 我们在 FileService.save(..) 中的代码实际上如下所示:

不幸的是,第二个 REST 端点,我们的外部系统之一,看起来与我们的第一个没有什么不同。它将被来自另一个系统的数据丰富。它需要一些带有 id 和字节数组的 FileDO2 以及特定于第二个外部系统的其他一些元数据。

如前所述,我们的方法应该是最小化客户端和外部系统之间的操作的内存占用。有时我们不仅要向该系统提供数据,还要执行一些可能会减慢整个流式处理过程的业务逻辑。

有什么想法可以整体做到这一点吗?目前我们还没有线索来做这一切......我们感谢任何帮助或想法。

0 投票
1 回答
304 浏览

java - 即使存在值,也无法单独从 Redis 加载值

我正在使用 Reactive Redis,我试图将 Redis 用作数据库的缓存。我正在检查缓存中是否存在值?如果存在则返回它,否则如果结果返回则查询数据库;存储结果缓存它并返回它。

但是,即使 Redis 中存在值,它仍然一直在查询数据库。

但是,即使 Redis 中存在值,调用也总是会访问数据库。我在这里做错了什么?

0 投票
2 回答
63 浏览

java - 可以在 Java 中传递指向 Lambda 的引用变量

我正在尝试一些 Spring 反应式代码,以下是相关的代码

上面的最后一行在 Intellj IDE 中给出了编译错误,说没有接受该类型的方法。

但是,如果我将其转换为以下内容:

这工作正常。为什么它不能接受指向我在第二段代码中传递的同一个 lambda 表达式的引用变量?

0 投票
0 回答
90 浏览

lambda - 如何对控制器中创建的无限 Flux Stream 进行单元测试以在 Spring Web-Flux Unit Test for controller 中进行计时

有没有办法使用 VirualTimeScheduler/ 或任何其他测试方法来测试在控制器方法中创建的无限通量流。例如,我在 RestController 中有以下代码

我不知道如何验证每个元素在 1 秒后依次返回。就像我改变控制器中的延迟一样,测试时间间隔的特定测试应该失败。我搜索了很多但徒劳无功。我有一些想法,因为 getBody() 被阻塞,我不能使用 virtualTimescheduler。但是如何解决这个问题呢?

0 投票
1 回答
1854 浏览

spring - Spring WebClient:使用 WebFlux.fn + reactor-addons 重试

我正在尝试WebClient使用 Kotlin Coroutines + WebFlux.fn + reactor-addons 添加条件重试:

还在重试之前添加条件

调用看起来像:

0 投票
1 回答
612 浏览

reactive-programming - 反应式 redis 连接中缺少异步支持

我正在学习使用 spring webflux,作为其中的一部分,我开发了一个使用 Redis 来保存和检索数据的应用程序。但我面临的问题是,当请求尝试连接到 redis 时,我收到以下错误:

我已经对这个问题进行了足够多的搜索,并试图找到任何有用的东西。

这是我的 Redis 配置类:

这是我的 redis 查找类,它实际上从 redis 获取和保存数据

控制器 :

pom.xml

请帮我确定问题。提前致谢 !

0 投票
1 回答
721 浏览

rxjs - 在分页期间从 springframework.data.repository.reactive.ReactiveCrudRepository 返回 Mono 时出现“不满足的依赖关系”异常

上下文:我成功创建了一个带有“org.springframework.data.domain.Pageable”参数的方法,旨在返回一个 Flux。当涉及 Pageable 时,我只找到了指导返回 Flux 而不是 Mono 的文章。到目前为止,一切都很好。

个人知识/假设:如果它只是一个页面,返回一个 Flux 是没有意义的。在其他世界中,它不是返回多个结果的流。好吧,如果我可以从 100 页中要求从第 2 页到第 10 页,我会看到一些关于禁止使用 Mono 的观点。据我了解,retrieveAllPaged bellow 中的真实事件是 0 事件或 1(从不超过 1)。

这是使用通量返回的方法:

暂定:因为它永远不会返回超过 1 我试过:

我得到了一个例外,它似乎表明 Mono 是不可接受的:

我的问题和直截了当的问题:分页时真的不可能返回 Mono 吗?如果是这样,相关评论将是为什么如果我从前端第二次调用它将是一个全新的搜索?我的意思是,例如,据我所知,从 RxJs 调用两次永远不会导致第二次调用重用第一次调用创建的通量。

完整的日志

0 投票
3 回答
330 浏览

spring-boot - Schdulers.elastic 不在 Reactor 中创建新线程

我正在尝试创建一个流,其中通量发出 10 个项目,每个项目并行,每个项目休眠 1 秒。由于每个项目都在单独的线程上发布,我希望整个过程需要 1 秒。但是日志显示它需要 10 秒。

我尝试将 subscribeOn 更改为 publishOn,映射到 doOnNext。但它们似乎都不起作用。

我是 Reactor 的新手,正在尝试了解我哪里出错了。非常感激任何的帮助。谢谢

0 投票
2 回答
2679 浏览

spring - 如何在非反应式 Spring EventListener 和反应式 Flux 之间架起桥梁

Flux.push通过在 的 lambda 表达式中调用和使用 sink直接创建 Fluxpush与使用 a 提供的 sink 有什么区别DirectProcessor

在 Flux 只发出几个事件的最小示例中,我可以这样做

与使用DirectProcessor

澄清一下:我知道我可以Flux.just在这里使用,但我的用例实际上是在 Spring@EventListener和 Spring WebFlux 之间建立一座桥梁,我想为每个传入的 SSE 请求创建一个 Flux,然后将事件发布到这个通量。

谁能告诉我,如果这两种方法都有效?当然,一定有一些不同。特别是,Reactor Reference Guide部分的DirectProcessor状态:

另一方面,它有不处理背压的限制。因此,如果您通过它推送 N 个元素,但它的至少一个订阅者请求少于 N 个,则 DirectProcessor 会向其订阅者发出 IllegalStateException 信号。

这意味着什么?

[编辑:]在我使用的问题的早期版本中Flux.generate()Flux.push()这显然是错误的,因为 generate 最多可以创建一个事件。

[编辑 2:] @123 向我询问了我正在努力实现的目标的完整示例。请耐心等待,这是一个 SO 问题的大量代码:

我实际尝试做的完整示例

我想在(非反应性)Spring 域事件侦听器和反应性 Flux 之间建立一座桥梁,然后我可以在 WebFlux 端点中使用它来发布 SSE。为简洁起见,以下代码片段使用Lombok注释。

假设我最终希望将用户在入职流程中的状态发布为 SSE。这是枚举:

StateChangedEvents每当任何用户的状态发生变化时,非反应式业务逻辑都会发布:

这就是我最初的问题的来源。我将如何构建一个将这个域事件转换为 Flux 流的桥梁?我的要求:

  • 新客户端注册后,应立即推送进程的当前状态
  • 每当达到“终端”入职状态时,通量流应该终止。

这是我到目前为止所得到的:

最后是使用网桥的控制器:

我的桥接代码中有几个问题我无法解决:

  • 我真的必须在我的Subscriber实例上进行同步以避免从poll初始状态写入过时的事件吗?如果我不这样做,StateChange 事件确实会在从存储库中读取当前状态之前到达并发布,然后将其推送到无序状态。当然,在没有 synchronized 关键字的情况下,必须有一种更优雅的 Flux-ish 方式来处理这个问题。

  • 我们已经排除了Flux.generate,它似乎可以使用Flux.pushFlux.create会产生更多的 SSE 事件吗?为什么?我害怕,我不明白这三个之间的区别。

  • 而不是使用静态方法,Flux我应该在这里使用一个DirectProcessor或任何其他处理器吗?我是整个反应式堆栈的新手,Spring Reactor 文档对我来说太模糊了,tbh。再次:有什么区别?我上面提到的关于背压的评论呢?