问题标签 [spring-reactor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
330 浏览

spring-boot - Schdulers.elastic 不在 Reactor 中创建新线程

我正在尝试创建一个流,其中通量发出 10 个项目,每个项目并行,每个项目休眠 1 秒。由于每个项目都在单独的线程上发布,我希望整个过程需要 1 秒。但是日志显示它需要 10 秒。

我尝试将 subscribeOn 更改为 publishOn,映射到 doOnNext。但它们似乎都不起作用。

我是 Reactor 的新手,正在尝试了解我哪里出错了。非常感激任何的帮助。谢谢

0 投票
2 回答
2679 浏览

spring - 如何在非反应式 Spring EventListener 和反应式 Flux 之间架起桥梁

Flux.push通过在 的 lambda 表达式中调用和使用 sink直接创建 Fluxpush与使用 a 提供的 sink 有什么区别DirectProcessor

在 Flux 只发出几个事件的最小示例中,我可以这样做

与使用DirectProcessor

澄清一下:我知道我可以Flux.just在这里使用,但我的用例实际上是在 Spring@EventListener和 Spring WebFlux 之间建立一座桥梁,我想为每个传入的 SSE 请求创建一个 Flux,然后将事件发布到这个通量。

谁能告诉我,如果这两种方法都有效?当然,一定有一些不同。特别是,Reactor Reference Guide部分的DirectProcessor状态:

另一方面,它有不处理背压的限制。因此,如果您通过它推送 N 个元素,但它的至少一个订阅者请求少于 N 个,则 DirectProcessor 会向其订阅者发出 IllegalStateException 信号。

这意味着什么?

[编辑:]在我使用的问题的早期版本中Flux.generate()Flux.push()这显然是错误的,因为 generate 最多可以创建一个事件。

[编辑 2:] @123 向我询问了我正在努力实现的目标的完整示例。请耐心等待,这是一个 SO 问题的大量代码:

我实际尝试做的完整示例

我想在(非反应性)Spring 域事件侦听器和反应性 Flux 之间建立一座桥梁,然后我可以在 WebFlux 端点中使用它来发布 SSE。为简洁起见,以下代码片段使用Lombok注释。

假设我最终希望将用户在入职流程中的状态发布为 SSE。这是枚举:

StateChangedEvents每当任何用户的状态发生变化时,非反应式业务逻辑都会发布:

这就是我最初的问题的来源。我将如何构建一个将这个域事件转换为 Flux 流的桥梁?我的要求:

  • 新客户端注册后,应立即推送进程的当前状态
  • 每当达到“终端”入职状态时,通量流应该终止。

这是我到目前为止所得到的:

最后是使用网桥的控制器:

我的桥接代码中有几个问题我无法解决:

  • 我真的必须在我的Subscriber实例上进行同步以避免从poll初始状态写入过时的事件吗?如果我不这样做,StateChange 事件确实会在从存储库中读取当前状态之前到达并发布,然后将其推送到无序状态。当然,在没有 synchronized 关键字的情况下,必须有一种更优雅的 Flux-ish 方式来处理这个问题。

  • 我们已经排除了Flux.generate,它似乎可以使用Flux.pushFlux.create会产生更多的 SSE 事件吗?为什么?我害怕,我不明白这三个之间的区别。

  • 而不是使用静态方法,Flux我应该在这里使用一个DirectProcessor或任何其他处理器吗?我是整个反应式堆栈的新手,Spring Reactor 文档对我来说太模糊了,tbh。再次:有什么区别?我上面提到的关于背压的评论呢?

0 投票
0 回答
723 浏览

netty - Spring Webflux 检测到 SSE 连接取消延迟

我想将 Spring Webflux 与 Server Sent Events 和 Netty 一起使用。我能够在客户端接收事件。但是当一个一个事件有很多,客户端取消订阅,那么服务端发送更多的事件,客户端没有收到。

年表:

这里的问题我不能确定事件 B 是否被传递,因为在发送消息的那一刻没有关闭连接的证据。那么,问题是我怎样才能避免这种情况呢?我如何知道在发送消息时是否已发送消息或连接是否实际打开?

这是我的控制器:

这是客户日志的尾部:

这是服务器日志的尾部:

0 投票
1 回答
4423 浏览

java - 如何在没有嵌套订阅的情况下组合/链接多个包含不同数据类型的 Mono/Flux

我们正在使用 project-reactor 从外部 web 服务中检索一些数据并生成一堆结果对象。

首先,我们需要获取一些触发下一次 Web 服务调用所需的主数据。在主数据可用后,我们会根据主数据的结果检索更多数据。接下来我们必须等待所有 Monos 发出它的结果。然后我们处理所有数据并构建我们的结果对象。

我们在反应流方面没有太多经验。我们的嵌套订阅解决方案有效,但我们相信可能有更好的方法来归档我们想要做的事情。

问题 1

Masterdata_A 和 Masterdata_B 可以并行获取,但是如何在不嵌套的情况下以反应方式表达呢?getFluxMasterdata_B 的每个结果都应该与 getMonoMasterdata_A 的一个结果相结合。

问题2

具有两个 Masterdata 的 Tupel 应该以某种方式受到限制,以免 Web 服务因许多数据请求而不堪重负。1 秒的实际延迟只是一个似乎可行的猜测,但最好定义第一个内部 flatMap 的最大并行执行数,以便一次最多有 N 个等待的 web 服务调用。

问题 3

将来我们可能必须从 Web 服务中获取更多数据来构建 ProcessingResult。是否有定义反应流以使其可读/可理解的最佳实践?反应流的嵌套可以还是应该避免(将所有内容保持在顶层)?


领域模型

WebserviceImpl

业务服务实现

0 投票
1 回答
1819 浏览

java - Spring webflux中的缓存

很惊讶在网上很难找到关于这个的具体信息。

是否可以在 spring webflux 中缓存结果?

例如:我可以有我的休息服务,然后使用 mongodb 作为主数据库,并使用 redis 作为缓存。因此,当请求通过时,它会检查缓存,然后如果请求的结果不在缓存中,它会查询 mongodb?

如果您有链接等,只需将它们作为评论,我会阅读它们并自己回答问题。

谢谢。

0 投票
2 回答
787 浏览

reactive-programming - 使用 2 个嵌套订阅返回 Mono/Flux

我需要为一个函数返回 Mono / Flux,但这有 2 个嵌套订阅。我正在寻找一个更好的解决方案,仅在这 2 个订阅值可用之后发布 Mono/Flux,然后执行一些操作来派生 finalValue。

最终目标是,函数 getFinalValue() 的订阅者应该能够订阅最终值。我对 Flux 也有类似的需求。最好的方法应该是什么?

0 投票
1 回答
682 浏览

spring-boot - 将返回一个 Mono导致(邪恶)同步,阻塞客户端/服务器通信?

我是 Spring Reactor 和 WebFlux 的新手,对 Spring 功能 web 中的事件流有点困惑。示例:我有一个处理函数返回一个Mono<ServerResponse>. 在其中,一个findAll()存储库方法被执行,返回一个Flux<T>. 为了遵守响应式宣言,为了实现异步、非阻塞并允许背压,我希望看到onNext()从存储库返回的每个元素都有一个。但是,在请求处理期间查看服务器日志,我只看到一个onNext()事件,这是有道理的,因为我的返回类型是Mono包含响应的:

路由器功能

处理函数

事件日志

相比之下,实现一个带有Flux<T>as 返回类型的经典 Spring 注释控制器方法,我将看到一个onNext()for 每个实例T(即结果集的每个项目),这对我来说看起来更“正确”(客户端现在可以控制事件流量等):

控制器

日志

这令人困惑。让我详细说明:

  • 从某种意义上说,使用Mono<ServerResponse>似乎是邪恶的,因为它将整个结果集封装在一个事件中,对我来说,这感觉就像打破了异步、非阻塞、启用背压的事件流的反应性原则。这不会从客户手中夺走控制权吗?对我来说,这看起来像是传统的、阻塞的客户端/服务器通信。
  • 直接返回Flux<T>感觉好多了,因为它支持按结果事件处理和背压控制。

我的问题是:

  • 创建一个 有什么影响Mono<ServerResponse>?这是否会导致阻塞、同步交互,onNext()仅在从 repo 中读取所有项目时才发出?我会失去背压功能等吗?
  • 如何让功能样式后端为onNext()结果集中的每个项目发送一个?
  • 就完全响应式(即非阻塞、异步和背压兼容)的函数式处理函数的返回类型而言,最佳实践是什么?我不确定是否Mono<ServerResponse>不违反这些反应性原则。

我可能完全错了,或者遗漏了一些重要的东西。谢谢你的帮助!

0 投票
1 回答
481 浏览

spring-webflux - 断路器未将状态从 HALF_OPEN 更改为 CLOSED

我的 spring-boot 反应式应用程序中有这个断路器配置 -

然后我像这样调用上游 API -

invokeUpstream方法如下所示-

后备方法只是抛出异常 -

现在,当上游 API 5 次返回500响应时,我可以看到 circuitBreaker 状态正在从 CLOSED 状态移动到 OPEN 状态,这是预期的。在 OPEN 状态下,它根据配置保持 30 秒。之后,它进入 HALF_OPEN 状态,问题从这里开始。即使上游 API 返回成功响应,它也永远不会进入 CLOSED 状态,它永远保持在 HALF_OPEN 状态。

我在我的应用程序中使用这些依赖项 -

即使浏览了所有文档,我也不确定自己做错了什么。

0 投票
1 回答
685 浏览

spring-reactor - 为什么 ConnectableFlux.connect() 阻塞?

我是 Spring Reactor 的新手。我一直试图了解ConnectableFlux类的工作原理。我已经阅读了文档并查看了在线发布的示例,但遇到了一个问题。

有人能告诉我为什么connect()方法会阻塞吗?我在文档中没有看到任何内容说它应该阻止..特别是因为它返回一个 Disposable 供以后使用。鉴于下面的示例代码,我永远不会通过 connect() 方法。

我试图基本上模拟我过去多次使用的旧式监听器接口范例。我想学习如何使用反应式流重新创建服务类和侦听器架构。我有一个简单的服务类,它有一个名为“ addUpdateListener(Listener l) ”的方法,然后当我的服务类“ doStuff() ”方法触发一些事件传递给任何监听器。

我应该说我将编写一个 API 供其他人使用,所以当我说 Service 类时,我并不是指 Spring 术语中的 @Service。这将是一个普通的 java 单例类。

我只是将 Spring Reactor 用于 Reactive Streams。我也在看 RxJava.. 但想看看 Spring Reactor Core 是否可以工作。

我从下面的测试类开始只是为了理解库语法,然后陷入了阻塞问题。

我认为这里描述了我正在寻找的内容:多个订阅者

更新:通过调试器运行我的代码,ConnectableFlux 连接方法中的代码永远不会返回。它挂在内部连接方法上,并且永远不会从该方法返回。

任何帮助都会很棒!

这也是我的 maven pom

运行上面的代码会给出以下输出..它只是以毫秒为单位打印时间,直到我 Ctrl-C 进程..

0 投票
2 回答
5618 浏览

java - 如何对 Flux 中的项目进行计数,如果计数大于 X,则返回错误,否则继续流水线

我是 Spring 的 Project Reactor 的新手,我不完全确定如何执行某些操作:

我有我的管道,管道返回记录。都好。

但我想计算这些记录,然后做一些事情(比如 if else),如果返回的记录 > X 则错误,否则继续。

知道 Count 返回 a Mono<Long>,那之后我会丢失记录,我该怎么办?

我在想:

不知何故在此平面图中使用flatMap和执行某些操作。不知何故,我看到reduceFlux 中有一种方法可能会有所帮助。

关键是,我不确定如何进行。