问题标签 [spring-reactor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
745 浏览

java - 将 Scala Future 转变为 Reactor Flux

我正在调用一个返回 a 的 API Future of Scala,并且我想转换为Reactor Flux显然没有阻塞并等待未来的响应。我正在尝试使用 EmitterProcessor,当未来成功时它工作正常,但不幸的是,当未来有超时时它不是。

这里的代码

我正在尝试做的是正确的?,还有更好的方法吗?

问候

0 投票
3 回答
4727 浏览

reactive-programming - 如何在反应式 Spring WebClient 调用的错误部分引发异常?

如果发生错误,我希望以下方法引发自定义异常:

我还设置了一个单元测试,期望抛出 CustomException。不幸的是,测试失败了,异常被包裹在一个 Mono 对象中。这里还有测试代码供参考:

我正在使用MockWebServer来模拟测试中的错误。

那么,如果调用,我应该如何实现 doOnError 或 onError 部分,以使我的方法真正抛出异常?

0 投票
2 回答
1154 浏览

spring-webflux - Mono flatMap + switchIfEmpty 组合运算符?

是否有允许处理结果/成功的运算符,无论 Mono 是否为空。例如:

在哪里:

是否有允许类似以下或类似内容的快捷方式运算符?

更具体地说,当 Mono 为空时将包含mono.someOperator()返回值,否则返回值。Optional<Foo>null

我想避免像上面提到的那样创建流程方法,并且只有一个代码块,但不确定哪个操作员可以在不重复块的情况下提供帮助。

0 投票
1 回答
1114 浏览

spring - 我们可以将 Spring Webflux 与 spring data jdbc 模板一起使用吗?会有什么影响?在这种情况下我们不能使用 R2DBC 怎么办?

我是 Spring Web Flux 的新手,不知道如何将它与现有的 Spring jdbc 模板集成,该模板使用 Oracle 的阻塞持久性驱动程序。IE。在我的情况下,R2DBC (oracle) 不受支持/不可用。我必须将 oracle-jdbc 驱动程序与 spring data jdbc 一起使用。

0 投票
0 回答
86 浏览

java - 是什么让某个东西成为 DirectProcessor 的生产者?

DirectProcessorspring-reactor 3.x文档:

向零对多订阅者发送 onNext、onError 和 onComplete 信号。请注意,与多个消费者一起,DirectProcessor 的当前实现支持多个生产者。但是,所有生产者必须在同一个线程上产生消息,否则将违反 Reactive Streams Spec 合同。

上述声明中的生产者是什么?

  1. aPublisher接收DirectProcessor作为Publisher.subscribe方法的参数
  2. 呼唤的东西DirectProcessor.onNext(T t)
  3. 调用FluxSink<T>.next(T t)FluxSink结果DirectProcessor.sink()
0 投票
2 回答
225 浏览

spring-reactor - 完成另一种助焊剂后如何启动一种助焊剂?

完成 Flux 后如何启动 Flux?Flux 不依赖于 Flux 的结果。

我认为操作 then() 应该可以完成工作,但它只是返回单声道。那么正确的运算符是什么?

0 投票
1 回答
694 浏览

java - 将两个 Mono 组合在一起,其中第二个 Mono 订阅了第一个

我最近一直在使用 Java 中的反应器库和 Spring 框架来学习反应式编程,并且在大多数情况下我已经能够掌握它。但是,我发现自己有几次处于同样的境地,并希望得到一些关于我哪里出错的建议。

我苦苦挣扎的要点通常是我想用单声道做一些事情,比如找到一些补充数据,然后将其添加回原始单声道。zip 功能在我看来是理想的候选人,但我最终订阅了原始单声道两次,这不是我的意图。

这是我一直试图解决的情况类型的人为示例,因为我无法共享我的公司代码。它假设我们使用的是响应式数据库并设置了记录器,并且 Person 类是不可变的,但具有 with<FieldName> 方法。

我在运行这样的代码时看到的输出是:

这确实是有道理的,因为原始人 mono 已在两个地方订阅,我将其映射到familyMembersMono和将它们压缩在一起时,但如果可以避免的话,我不想对存储库进行不必要的调用。

有人对处理这种行为的更好方法提出建议吗?

0 投票
1 回答
89 浏览

spring - Spring Reactor - 使用 Pageable 端点

这是我第一次使用 Spring Reactor,我面临以下挑战:

我有一项服务,它允许使用由页码和页面大小指定的许多记录:

GetContactsForGroupResponse 在其他字段中包含分页元数据:

现在我需要编写另一种方法来读取所有页面

并将结果合并到一个集合中:

到目前为止,我已经写过:

我逐页阅读结果,直到读到最后一页。我想知道从 SpringReactor 的角度来看,正确的实现方式是什么

任何意见,将不胜感激,

谢谢

0 投票
1 回答
2286 浏览

spring-boot - 将自定义标头添加到 Spring Cloud Stream(使用 Spring Reactor)

作为 Spring Reactor 的新手,我正在尝试使用 Spring 云流(使用 rabbitMQ)流式传输数据。在将消息发送到队列之前,我需要添加一些自定义标头。

我的 spring-cloud-stream 的配置是:

生产者参考:

标头已成功添加到消息中,但它在某处被覆盖并且没有传播给消费者。

有人可以分享他们对我们如何使用 Spring Cloud Stream 将自定义标头添加到消息的想法。

提前致谢!

0 投票
1 回答
34 浏览

java - 如何在通量\单声道上阻止和订阅

我有一个通量A,我需要编写一个方法B func(),以便该方法返回另一个通量B以便可以阻止它(用于测试目的)。但是,这个通量B已经必须订阅通量A。所以,当有人调用时:func().block(); 在测试中,那么测试应该等待通量A完成。(方法func()必须订阅A才能启动通量A 的执行)