问题标签 [spring-reactor]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring - Spring Reactor Webflux 调度程序并行性
对于完全非阻塞的端到端响应式调用,是否建议显式调用 publishOn 或 subscribeOn 来切换调度程序?对于 CPU 消耗或非消耗任务,总是使用并行通量来优化性能是否有利?
spring-cloud-gateway - Spring cloud gateway - 在将请求路由到下游系统之前使用 GlobalFilter 添加动态标头
我试图在将请求传递给下游之前为所有请求标头添加动态值。
我创建了一个用户定义的类来实现org.springframework.cloud.gateway.filter.GlobalFilter
在这个类中,我必须调用其他服务(2 个响应式 redis 和 1 个 api 调用)来检索需要在标题中设置的动态值。
这是我现在面临的问题,
GatewayFilterChain.filter(exchange) 订阅在其他单声道订阅(2 个 redis 和 1 个服务)完成之前完成。因此,在我改变交换之前,不添加标头的请求将传递到下游。
为了避免这个问题,我添加了单声道订阅的延迟
chain.filter(exchange).delaySubscription(Duration.ofMillis(300L))
但是我不想在系统中引入手动延迟,请您指导/建议我更好的解决方案,以便在其他单声道订阅完成之前不会订阅 GatewayFilterChain。
spring-webflux - 将 RabbitMQ 侦听器桥接到 Flux
我有一个响应式 Spring Boot 应用程序,它使用来自 RabbitMQ 的消息并将它们保存在(MongoDB)存储库中:
假设多条消息在短时间内到达,此代码可能会耗尽数据库的已配置 ConnectionPool。如果我会在 a 中接收消息Flux
,我可以将concatMap()
它们放入数据库或将它们插入到 n 个文档的存储桶中。
这就是为什么我尝试将给定的 RabbitMQ 侦听器连接到自我管理的 Flux 的原因:
这在本地和一段时间内有效,但在一段时间后(我预计 12-24 小时)它会停止在数据库中存储消息,所以我很确定我做错了什么。
将传入的 RabbitMQ 消息转换Flux
为消息的正确方法是什么?
java - 如何在文件行出现并将它们表示为 Flux 时读取它们?
假设我们依赖Reactor 3
(即在 Spring 5 应用程序中)和一个文本文件my/file.txt
.
我需要订阅文本文件行(现有的和将来会出现的)并创建一个Flux<String>
. 如果您愿意,忽略阻塞 IO 读取关注点,让我们来揭示构建此类订阅的原理。
为简单起见,假设我们将这些行打印到 std 输出:
什么是正确的实施方式Flux<String> flowLinesFrom(Path)
?
spring - Spring 3 RestTemplate 到 WebClient
我正在尝试使用 WebClient 迁移 Spring 版本 3 RestTemplate。但是在进行此迁移时出现此编译错误。
这是我要更改的 Spring RestTemplate 代码
用 WebClient 替换代码
这是POM文件配置
reactive-programming - WebFlux(Reactor)中的文件处理
我正在开发一个新的响应式项目,其中正在进行大量文件处理 IO。如果我以命令式阻塞方式编写 IO 代码然后将它们包装在 Mono 中,将它们发布到 boundedElastic 调度程序上是否足够?boundedElastic 池大小会限制并发操作的数量吗?
如果这不是正确的方法,您能否举例说明如何使用 Reactor 将字节写入文件?
project-reactor - Project Reactor + flatMap + 多个 onErrorComplete - 未按预期工作
当多个onErrorContinue添加到管道以处理从flatMap抛出的特定类型的异常时,异常处理无法按预期工作。
我希望下面的代码应该删除元素 1 到 6,并且订阅者应该使用元素 7 到 10。
这是我得到的输出:
问题:为什么第二个onErrorContinue
没有被调用,但是异常发送给了订阅者?
附加说明:
如果我删除 1st 和 2nd onErrorContinue
,那么所有异常都由 3rd 处理onErrorContinue
。我可以使用这种方法来接收所有异常并检查异常类型并继续处理。但是,我想让它更清晰的异常处理而不是添加if..else
块。
这个问题与为什么 Thread.sleep() 会触发对 Flux.interval() 的订阅有何不同?
1)这个关于异常处理和异常处理顺序的问题;另一个问题是关于并行处理元素并使主线程等待所有元素处理完成 3)这个问题对线程没有任何关注,即使 add Thread.sleep(10000)
after . subscribe
,行为也没有变化。
java - 如何在控制器中获取通配符路径变量
我有一个控制器:
如何获取/name/
??之后的 URL 字符串(包括所有查询参数)?基本上我需要这个**
部分。当然我可以删除/name/
fromserverHttpRequest.getPath(..)
但有更好的方法吗?
java - 将反应流拆分为 2 个流,然后使用关联 ID 再次合并它们?
用户故事:
我有图像
一群有学生的学校
拥有多辆巴士出租的巴士公司
每个学校都可以从巴士公司租一辆巴士
- 1个或多个学校可以从巴士公司租用同一辆巴士
鉴于校车正在运行,一所学校一次接 10 名学生,并认为所有这些旅行都是在学校瞬间发生的(即:如果校车正在运行,那么所有学生都会立即到达各自的学校)
问题陈述
我有一个学校流,我需要将学校流分成两个流
每个学校的一组学生流
一大波公交车
然后对于公共汽车流,我需要从流中移除损坏的公共汽车,这样我就可以有一个工作校车流
最后,我需要重新合并给定珊瑚 id 的两个流(学校组流和公共汽车流)(即:属于学校 x 的学生组可以使用对学校 x 使用有效的总线 Y)
笔记
- 解决方案必须在java中
- 我最好使用 Reactor 项目,但可以随意使用 RXJava
- 感谢您抽出时间帮助我解决这样的问题:)
spring-webflux - 如何根据反应堆上下文更改 url webClient
基于已在 webfilter 上添加的国家/地区,我需要将请求修改到不同的服务器。
我已经将带有 webfilter 的上下文添加到了链中,我可以在这里看到它,但是我没有找到如何根据链上的信号上下文修改接收到的 clientRequest。如果有人已经找到了一种方法并且可以提供帮助: