问题标签 [spring-reactor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
2716 浏览

spring - Spring Reactor Webflux 调度程序并行性

对于完全非阻塞的端到端响应式调用,是否建议显式调用 publishOn 或 subscribeOn 来切换调度程序?对于 CPU 消耗或非消耗任务,总是使用并行通量来优化性能是否有利?

0 投票
0 回答
755 浏览

spring-cloud-gateway - Spring cloud gateway - 在将请求路由到下游系统之前使用 GlobalFilter 添加动态标头

我试图在将请求传递给下游之前为所有请求标头添加动态值。

我创建了一个用户定义的类来实现org.springframework.cloud.gateway.filter.GlobalFilter

在这个类中,我必须调用其他服务(2 个响应式 redis 和 1 个 api 调用)来检索需要在标题中设置的动态值。

这是我现在面临的问题,

GatewayFilterChain.filter(exchange) 订阅在其他单声道订阅(2 个 redis 和 1 个服务)完成之前完成。因此,在我改变交换之前,不添加标头的请求将传递到下游。

为了避免这个问题,我添加了单声道订阅的延迟

chain.filter(exchange).delaySubscription(Duration.ofMillis(300L))

但是我不想在系统中引入手动延迟,请您指导/建议我更好的解决方案,以便在其他单声道订阅完成之前不会订阅 GatewayFilterChain。

0 投票
0 回答
500 浏览

spring-webflux - 将 RabbitMQ 侦听器桥接到 Flux

我有一个响应式 Spring Boot 应用程序,它使用来自 RabbitMQ 的消息并将它们保存在(MongoDB)存储库中:

假设多条消息在短时间内到达,此代码可能会耗尽数据库的已配置 ConnectionPool。如果我会在 a 中接收消息Flux,我可以将concatMap()它们放入数据库或将它们插入到 n 个文档的存储桶中。

这就是为什么我尝试将给定的 RabbitMQ 侦听器连接到自我管理的 Flux 的原因:

这在本地和一段时间内有效,但在一段时间后(我预计 12-24 小时)它会停止在数据库中存储消息,所以我很确定我做错了什么。

将传入的 RabbitMQ 消息转换Flux为消息的正确方法是什么?

0 投票
1 回答
191 浏览

java - 如何在文件行出现并将它们表示为 Flux 时读取它们?

假设我们依赖Reactor 3(即在 Spring 5 应用程序中)和一个文本文件my/file.txt.

我需要订阅文本文件行(现有的和将来会出现的)并创建一个Flux<String>. 如果您愿意,忽略阻塞 IO 读取关注点,让我们来揭示构建此类订阅的原理。

为简单起见,假设我们将这些行打印到 std 输出:

什么是正确的实施方式Flux<String> flowLinesFrom(Path)

0 投票
1 回答
174 浏览

spring - Spring 3 RestTemplate 到 WebClient

我正在尝试使用 WebClient 迁移 Spring 版本 3 RestTemplate。但是在进行此迁移时出现此编译错误。

这是我要更改的 Spring RestTemplate 代码

用 WebClient 替换代码

这是POM文件配置

0 投票
1 回答
1587 浏览

reactive-programming - WebFlux(Reactor)中的文件处理

我正在开发一个新的响应式项目,其中正在进行大量文件处理 IO。如果我以命令式阻塞方式编写 IO 代码然后将它们包装在 Mono 中,将它们发布到 boundedElastic 调度程序上是否足够?boundedElastic 池大小会限制并发操作的数量吗?

如果这不是正确的方法,您能否举例说明如何使用 Reactor 将字节写入文件?

0 投票
1 回答
421 浏览

project-reactor - Project Reactor + flatMap + 多个 onErrorComplete - 未按预期工作

当多个onErrorContinue添加到管道以处理从flatMap抛出的特定类型的异常时,异常处理无法按预期工作。

我希望下面的代码应该删除元素 1 到 6,并且订阅者应该使用元素 7 到 10。

这是我得到的输出:

问题:为什么第二个onErrorContinue没有被调用,但是异常发送给了订阅者?

附加说明: 如果我删除 1st 和 2nd onErrorContinue,那么所有异常都由 3rd 处理onErrorContinue。我可以使用这种方法来接收所有异常并检查异常类型并继续处理。但是,我想让它更清晰的异常处理而不是添加if..else块。

这个问题与为什么 Thread.sleep() 会触发对 Flux.interval() 的订阅有何不同?

1)这个关于异常处理和异常处理顺序的问题;另一个问题是关于并行处理元素并使主线程等待所有元素处理完成 3)这个问题对线程没有任何关注,即使 add Thread.sleep(10000)after . subscribe,行为也没有变化。

0 投票
1 回答
269 浏览

java - 如何在控制器中获取通配符路径变量

我有一个控制器:

如何获取/name/??之后的 URL 字符串(包括所有查询参数)?基本上我需要这个**部分。当然我可以删除/name/fromserverHttpRequest.getPath(..)但有更好的方法吗?

0 投票
0 回答
114 浏览

java - 将反应流拆分为 2 个流,然后使用关联 ID 再次合并它们?

用户故事:

我有图像

一群有学生的学校

拥有多辆巴士出租的巴士公司

每个学校都可以从巴士公司租一辆巴士

  • 1个或多个学校可以从巴士公司租用同一辆巴士

鉴于校车正在运行,一所学校一次接 10 名学生,并认为所有这些旅行都是在学校瞬间发生的(即:如果校车正在运行,那么所有学生都会立即到达各自的学校)

问题陈述

  • 我有一个学校流,我需要将学校流分成两个流

    1. 每个学校的一组学生流

    2. 一大波公交车

  • 然后对于公共汽车流,我需要从流中移除损坏的公共汽车,这样我就可以有一个工作校车流

  • 最后,我需要重新合并给定珊瑚 id 的两个流(学校组流和公共汽车流)(即:属于学校 x 的学生组可以使用对学校 x 使用有效的总线 Y)

  • 笔记

    • 解决方案必须在java中
    • 我最好使用 Reactor 项目,但可以随意使用 RXJava
    • 感谢您抽出时间帮助我解决这样的问题:)
0 投票
2 回答
405 浏览

spring-webflux - 如何根据反应堆上下文更改 url webClient

基于已在 webfilter 上添加的国家/地区,我需要将请求修改到不同的服务器。

我已经将带有 webfilter 的上下文添加到了链中,我可以在这里看到它,但是我没有找到如何根据链上的信号上下文修改接收到的 clientRequest。如果有人已经找到了一种方法并且可以提供帮助: