问题标签 [spring-reactor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
3569 浏览

java - 如何测试返回 Mono 的函数其中有另一个 Mono在使用 Reactor 的 StepVerifier

我有一个这样的 ServiceWebClientInterface.java

MyClass.java

我正在尝试使用 StepVerifier 对 myMethod 进行单元测试,但我无法在 myMethod 的内部 lambda 函数中执行语句。

MyClassTest.java

请在上面找到完整的代码,当我运行测试时,我观察到内部 lambda 函数没有被使用步骤验证器调用。

0 投票
1 回答
2632 浏览

spring-webflux - Webflux 链接多个 Mono

我是响应式编程(Spring webflux)的新手,想知道如何最好地处理这个用例。我有一个返回 Mono 的反应式服务调用 ( getAccount ),我想将它与另一个返回Mono<Set>的服务调用getBooks和一个执行某种转换并返回类似Mono<Set>的最终同步调用转换链接起来。当getAccountgetBooks返回空时记录警告时,我会以Mono<Set>的形式链接并返回转换后的数据吗?这是我试图做的一个简化版本的例子。

鉴于这里有一些虚假服务

我想获取给定用户的帐户并找到他/她借的所有书籍并转换书籍并将值作为 Mono<Set> 返回值,同时在适当的地方记录警告

这是我的开始

但是,我不确定混合响应式和流式是否是一件坏事,而且它看起来不正确。

0 投票
1 回答
599 浏览

java - org.springframework.boot.web.reactive.error.DefaultErrorAttributes 在 org.springframework.boot:2.3.1.RELEASE 中不再被调用

弹簧反应式网络:

在 Spring Boot 2.2.* 的早期,我使用了一个扩展 DefaultErrorAttributes 的类。此类用于全局处理整个微服务的异常。当我升级到 2.3.1 时,它不再工作了。我在 2.3.1 版本中没有发现 spring 响应式 web 有任何重大变化。有什么改变可以打破这个吗?我们需要改变什么吗?有输入吗?

似乎不再调用 DefaultErrorAttributes 了。示例代码在这里。

}

0 投票
1 回答
686 浏览

java - Spring Gateway AsyncPredicate 不适用于反应器和通量

我们为 Spring-Gateway 编写了一个自定义 Predicate 工厂来路由请求。我们正在解析 XML 请求的正文,然后根据正文中存在的特定方法派生路由。在执行此操作时,我们编写了以下代码来创建 ServerRquest。

使用旧版本Spring-Boot-Parent (2.1.7.RELEASE) and spring-cloud-dependencies (Greenwich.RELEASE)完美运行此解决方案。但是使用最新版本的Spring-Boot-Parent (2.3.1.RELEASE) and spring-cloud-dependencies (Hoxton.SR6)我得到以下异常。网关应用程序正常启动,没有任何错误。

有没有其他人也有同样的问题并且知道如何解决这个问题?

0 投票
0 回答
157 浏览

spring-webflux - 重试与 flatMap 一起使用时会抛出异常

如果服务出现任何异常,我想重试。但是当使用 retryWhen 时出现异常 java.lang.IllegalStateException: UnicastProcessor 只允许单个订阅者。

无需重试,它工作正常

retryStrategy的定义如下:

0 投票
1 回答
97 浏览

spring - 如何使用多个网络调用和有条件的信号完成 flatMapMany?

我需要编写一个方法

  • Location从端点获取标头
  • 生成一系列Some,每个都应该从Location从第一个端点获取的内容中检索。
  • 需要返回一个Flux<Some>

看起来像这样。

如何使用完成属性将其映射Mono<String>到一段时间?Flux<Some>Some#status

0 投票
1 回答
642 浏览

spring-boot - 如何增加reactor WebClient的工作线程?

我正在使用 Spring reactor Webflux 2.3.1

我正在使用 webClient 进行一些 http 调用。

(org.springframework.web.reactive.function.client.WebClient)

如何增加工作线程、IO 线程的数量以及事件循环的数量?

0 投票
1 回答
900 浏览

spring - Webflux 控制器'返回对象而不是 Mono'

您好,我是 Webflux 的新手,我遵循构建反应式微服务的教程。在我的项目中,我遇到了以下问题。

我想为产品服务创建一个 crud api,下面是 Create 方法

问题是,当我从邮递员调用此方法时,我收到错误 “block()/blockFirst()/blockLast() 正在阻塞,线程 reactor-http-nio-3 不支持”但是当我使用 StreamListener这个电话工作正常。流侦听器从 rabbit-mq 通道获取事件

流监听器

我有两个问题。

  1. 为什么这适用于 StreamListener 而不是简单的请求?
  2. webflux 中是否有适当的方法来返回 Mono 的对象,或者我们总是必须返回 Mono?
0 投票
0 回答
251 浏览

java - Aws Kinesis 消费消息反应式

我正在尝试实现 kinesis 消费者反应式。我目前的理解是,一个运动分片总是被一个线程消耗,所以消耗它的线程将订阅。

这是我的代码:

我认为关于 kinesis 有很多我不了解的细节,即使我继续阅读它也没有变得更清楚。

文档说这个连接将有一个低延迟的 HTTP/2 连接,但我不知道如何在运行之间处理这个,我的意思是如果我重新启动应用程序我不想处理已经消费的消息。如何为响应式版本实现 RecordProcessorCheckpointer?

同样在几分钟后,该过程退出而没有任何错误。我记得在我需要重新订阅的文档中阅读过。如果我在无限循环中运行是否足够:responseHandlerBuilderReactor(client, request).join();

0 投票
1 回答
442 浏览

java - 使用带有多个 switchIfEmpty 的 Spring Reactor Mono

我有一个简单的 java 验证流程,像这个例子:

我试图链接方法调用以摆脱,ifs但这不起作用:

问题是,即使isValid()代码继续失败并且第二个会switch覆盖第一个。

我怎样才能使代码工作并保留链接?