问题标签 [spring-reactive]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
4423 浏览

java - 如何在没有嵌套订阅的情况下组合/链接多个包含不同数据类型的 Mono/Flux

我们正在使用 project-reactor 从外部 web 服务中检索一些数据并生成一堆结果对象。

首先,我们需要获取一些触发下一次 Web 服务调用所需的主数据。在主数据可用后,我们会根据主数据的结果检索更多数据。接下来我们必须等待所有 Monos 发出它的结果。然后我们处理所有数据并构建我们的结果对象。

我们在反应流方面没有太多经验。我们的嵌套订阅解决方案有效,但我们相信可能有更好的方法来归档我们想要做的事情。

问题 1

Masterdata_A 和 Masterdata_B 可以并行获取,但是如何在不嵌套的情况下以反应方式表达呢?getFluxMasterdata_B 的每个结果都应该与 getMonoMasterdata_A 的一个结果相结合。

问题2

具有两个 Masterdata 的 Tupel 应该以某种方式受到限制,以免 Web 服务因许多数据请求而不堪重负。1 秒的实际延迟只是一个似乎可行的猜测,但最好定义第一个内部 flatMap 的最大并行执行数,以便一次最多有 N 个等待的 web 服务调用。

问题 3

将来我们可能必须从 Web 服务中获取更多数据来构建 ProcessingResult。是否有定义反应流以使其可读/可理解的最佳实践?反应流的嵌套可以还是应该避免(将所有内容保持在顶层)?


领域模型

WebserviceImpl

业务服务实现

0 投票
1 回答
1819 浏览

java - Spring webflux中的缓存

很惊讶在网上很难找到关于这个的具体信息。

是否可以在 spring webflux 中缓存结果?

例如:我可以有我的休息服务,然后使用 mongodb 作为主数据库,并使用 redis 作为缓存。因此,当请求通过时,它会检查缓存,然后如果请求的结果不在缓存中,它会查询 mongodb?

如果您有链接等,只需将它们作为评论,我会阅读它们并自己回答问题。

谢谢。

0 投票
1 回答
246 浏览

java - Subscription to request body not emmitting values in Spring Reactive

I have been strugling against this for two days now, and finally reach a dead end.

Thing is, I'm handling a request like this:

#xA;

And the name is never logged. All I get is:

#xA;

But the actual onNext never happens

Any suggestion about what I'm missing?

0 投票
1 回答
1512 浏览

java - SSE `this.eventSource.onmessage` 调用失败。错误`“EventSource 的响应具有不是“text/event-stream”的 MIME 类型(“application/json”)

Angular Server Sent Event调用失败,我在 Chrome 开发工具(附图片)中看到this.eventSource.onmessage一个错误,返回了两个。"EventSource's response has a MIME type ("application/json") that is not "text/event-stream". Aborting the connection."Content-TypeChrome 开发工具响应网络选项卡

后端代码:Spring Reactor/REST

前端:角

方法this.eventSource.onmessage报错EventSource's response has a MIME type ("application/json") that is not "text/event-stream". Aborting the connection.

任何帮助都会很棒!

0 投票
1 回答
2874 浏览

java - 依赖的 webclient 调用 - Spring Reactive

我正在尝试进行两个 API 调用,第二个 API 调用取决于第一个 API 响应。下面的代码给出了第一个 weblient 调用的响应。这里我没有得到第二个 API 调用的响应。在日志中,我可以看到对第二个 Web 客户端调用的请求甚至不是从 onSubscribe() 开始的。你能告诉我我做错了什么吗?

更新 2:

我已将代码更改为 Functions 并使用 Flux 而不是流迭代。我现在面临的是,所有迭代都在 doSecondCall 方法中被过滤掉。请参考我在 doSecondCall 方法中的评论。因此不会触发第二次呼叫。如果我不应用过滤器,则会触发诸如“issue/null”之类的请求,这也会导致我的服务中断。

[使用 WebFlux 的反应式编程如何处理依赖的外部 api 调用] @Thomas- 另外,刚刚找到了这个线程。他基本上说除非你阻止第一个电话,否则没有办法声明第二个电话。是这样吗?

0 投票
1 回答
28 浏览

spring - Spring Cloud Stream 访问原始 Stream

这是我的用例,用户使用 websocket(带有订阅的 GraphQl)订阅我的流,我需要通过用户 ID 返回一个org.reactivestreams.Publisher(应该是我的 kafka 主题订阅)过滤消息的实例。

为了说明,像这样:

0 投票
2 回答
787 浏览

reactive-programming - 使用 2 个嵌套订阅返回 Mono/Flux

我需要为一个函数返回 Mono / Flux,但这有 2 个嵌套订阅。我正在寻找一个更好的解决方案,仅在这 2 个订阅值可用之后发布 Mono/Flux,然后执行一些操作来派生 finalValue。

最终目标是,函数 getFinalValue() 的订阅者应该能够订阅最终值。我对 Flux 也有类似的需求。最好的方法应该是什么?

0 投票
0 回答
422 浏览

spring - 手动配置 Spring Boot Reactive Mongo

我正在尝试设置具有多个数据库的 Spring Boot 应用程序的 PoC。应该使用哪个 db 由 Spring Profile 决定。这个数据库之一是 MongoDb,我使用 Spring Data Reactive 连接到它。

因为我希望应用程序仅在某个配置文件处于活动状态时才连接到 mongo,所以我禁用了 MongoAutoconfigurationMongoReactiveAutoConfiguration

我设置了一个 DbConfig,它扩展了AbstractReactiveMongoConfiguration并且仅在 mongo 配置文件处于活动状态并且具有EnableReactiveMongoRepositories注释时使用

只有这样,应用程序才能正常工作,只有在 mongo 配置文件处于活动状态时才会创建与 mongo 的连接。但是,当我尝试使用ReactiveCrudRepository时,例如:

然后它找不到CharacterCrudRepository Bean 并在启动时失败

当我删除自动配置的排除项时,存储库工作正常,但我不能将其仅限于 mongo 配置文件(即使所有 mongo bean 都限制在配置文件中,应用程序也会尝试连接到 mongo)。

0 投票
1 回答
682 浏览

spring-boot - 将返回一个 Mono导致(邪恶)同步,阻塞客户端/服务器通信?

我是 Spring Reactor 和 WebFlux 的新手,对 Spring 功能 web 中的事件流有点困惑。示例:我有一个处理函数返回一个Mono<ServerResponse>. 在其中,一个findAll()存储库方法被执行,返回一个Flux<T>. 为了遵守响应式宣言,为了实现异步、非阻塞并允许背压,我希望看到onNext()从存储库返回的每个元素都有一个。但是,在请求处理期间查看服务器日志,我只看到一个onNext()事件,这是有道理的,因为我的返回类型是Mono包含响应的:

路由器功能

处理函数

事件日志

相比之下,实现一个带有Flux<T>as 返回类型的经典 Spring 注释控制器方法,我将看到一个onNext()for 每个实例T(即结果集的每个项目),这对我来说看起来更“正确”(客户端现在可以控制事件流量等):

控制器

日志

这令人困惑。让我详细说明:

  • 从某种意义上说,使用Mono<ServerResponse>似乎是邪恶的,因为它将整个结果集封装在一个事件中,对我来说,这感觉就像打破了异步、非阻塞、启用背压的事件流的反应性原则。这不会从客户手中夺走控制权吗?对我来说,这看起来像是传统的、阻塞的客户端/服务器通信。
  • 直接返回Flux<T>感觉好多了,因为它支持按结果事件处理和背压控制。

我的问题是:

  • 创建一个 有什么影响Mono<ServerResponse>?这是否会导致阻塞、同步交互,onNext()仅在从 repo 中读取所有项目时才发出?我会失去背压功能等吗?
  • 如何让功能样式后端为onNext()结果集中的每个项目发送一个?
  • 就完全响应式(即非阻塞、异步和背压兼容)的函数式处理函数的返回类型而言,最佳实践是什么?我不确定是否Mono<ServerResponse>不违反这些反应性原则。

我可能完全错了,或者遗漏了一些重要的东西。谢谢你的帮助!

0 投票
1 回答
49 浏览

java - 保存新关系时关系变得疏离

早上好,我正在使用我通过 spring-data-neo4j-rx-spring-boot-starter 版本 1.0.0-beta04 添加的 SDN-RX,我的 neo4j 数据库是版本 4.0.2 企业版。我有一个 ClassificationDomain 节点类型,并定义了一个 ClassificationDomain 类型的 regionClassificationDomain,我附加了一个 RegionType 类型的“大陆”节点,大陆节点是更多区域类型的层次结构的根节点,如下所示:

我现在正在定义实际的区域节点,这些节点也是分层的,需要附加到它们相应的区域类型,如下所示

我的问题是,当我保存一个新的区域节点时,比如说我想将一个子区域附加到“Harare”,许多其他关系在 regionType 层次结构和区域层次结构上都分离了。

我错过了什么或做错了什么?

我的存储库正在扩展 ReactiveNeo4jRepository。

地区 POJO

我的分类 POJO

区域控制器,我省略了导入。

在此先感谢您的帮助。