问题标签 [spring-reactor]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - 将 Scala Future 转变为 Reactor Flux
我正在调用一个返回 a 的 API Future of Scala
,并且我想转换为Reactor Flux
显然没有阻塞并等待未来的响应。我正在尝试使用 EmitterProcessor,当未来成功时它工作正常,但不幸的是,当未来有超时时它不是。
这里的代码
我正在尝试做的是正确的?,还有更好的方法吗?
问候
reactive-programming - 如何在反应式 Spring WebClient 调用的错误部分引发异常?
如果发生错误,我希望以下方法引发自定义异常:
我还设置了一个单元测试,期望抛出 CustomException。不幸的是,测试失败了,异常被包裹在一个 Mono 对象中。这里还有测试代码供参考:
我正在使用MockWebServer来模拟测试中的错误。
那么,如果调用,我应该如何实现 doOnError 或 onError 部分,以使我的方法真正抛出异常?
spring-webflux - Mono flatMap + switchIfEmpty 组合运算符?
是否有允许处理结果/成功的运算符,无论 Mono 是否为空。例如:
在哪里:
是否有允许类似以下或类似内容的快捷方式运算符?
更具体地说,当 Mono 为空时将包含mono.someOperator()
返回值,否则返回值。Optional<Foo>
null
我想避免像上面提到的那样创建流程方法,并且只有一个代码块,但不确定哪个操作员可以在不重复块的情况下提供帮助。
spring - 我们可以将 Spring Webflux 与 spring data jdbc 模板一起使用吗?会有什么影响?在这种情况下我们不能使用 R2DBC 怎么办?
我是 Spring Web Flux 的新手,不知道如何将它与现有的 Spring jdbc 模板集成,该模板使用 Oracle 的阻塞持久性驱动程序。IE。在我的情况下,R2DBC (oracle) 不受支持/不可用。我必须将 oracle-jdbc 驱动程序与 spring data jdbc 一起使用。
java - 是什么让某个东西成为 DirectProcessor 的生产者?
从DirectProcessor,spring-reactor 3.x文档:
向零对多订阅者发送 onNext、onError 和 onComplete 信号。请注意,与多个消费者一起,DirectProcessor 的当前实现支持多个生产者。但是,所有生产者必须在同一个线程上产生消息,否则将违反 Reactive Streams Spec 合同。
上述声明中的生产者是什么?
- a
Publisher
接收DirectProcessor作为Publisher.subscribe方法的参数 - 呼唤的东西
DirectProcessor.onNext(T t)
- 调用
FluxSink<T>.next(T t)
FluxSink的结果DirectProcessor.sink()
spring-reactor - 完成另一种助焊剂后如何启动一种助焊剂?
完成 Flux 后如何启动 Flux?Flux 不依赖于 Flux 的结果。
我认为操作 then() 应该可以完成工作,但它只是返回单声道。那么正确的运算符是什么?
java - 将两个 Mono 组合在一起,其中第二个 Mono 订阅了第一个
我最近一直在使用 Java 中的反应器库和 Spring 框架来学习反应式编程,并且在大多数情况下我已经能够掌握它。但是,我发现自己有几次处于同样的境地,并希望得到一些关于我哪里出错的建议。
我苦苦挣扎的要点通常是我想用单声道做一些事情,比如找到一些补充数据,然后将其添加回原始单声道。zip 功能在我看来是理想的候选人,但我最终订阅了原始单声道两次,这不是我的意图。
这是我一直试图解决的情况类型的人为示例,因为我无法共享我的公司代码。它假设我们使用的是响应式数据库并设置了记录器,并且 Person 类是不可变的,但具有 with<FieldName> 方法。
我在运行这样的代码时看到的输出是:
这确实是有道理的,因为原始人 mono 已在两个地方订阅,我将其映射到familyMembersMono
和将它们压缩在一起时,但如果可以避免的话,我不想对存储库进行不必要的调用。
有人对处理这种行为的更好方法提出建议吗?
spring - Spring Reactor - 使用 Pageable 端点
这是我第一次使用 Spring Reactor,我面临以下挑战:
我有一项服务,它允许使用由页码和页面大小指定的许多记录:
GetContactsForGroupResponse 在其他字段中包含分页元数据:
现在我需要编写另一种方法来读取所有页面
并将结果合并到一个集合中:
到目前为止,我已经写过:
我逐页阅读结果,直到读到最后一页。我想知道从 SpringReactor 的角度来看,正确的实现方式是什么
任何意见,将不胜感激,
谢谢
spring-boot - 将自定义标头添加到 Spring Cloud Stream(使用 Spring Reactor)
作为 Spring Reactor 的新手,我正在尝试使用 Spring 云流(使用 rabbitMQ)流式传输数据。在将消息发送到队列之前,我需要添加一些自定义标头。
我的 spring-cloud-stream 的配置是:
生产者参考:
标头已成功添加到消息中,但它在某处被覆盖并且没有传播给消费者。
有人可以分享他们对我们如何使用 Spring Cloud Stream 将自定义标头添加到消息的想法。
提前致谢!
java - 如何在通量\单声道上阻止和订阅
我有一个通量A,我需要编写一个方法B func(),以便该方法返回另一个通量B以便可以阻止它(用于测试目的)。但是,这个通量B已经必须订阅通量A。所以,当有人调用时:func().block(); 在测试中,那么测试应该等待通量A完成。(方法func()必须订阅A才能启动通量A 的执行)