问题标签 [project-reactor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
759 浏览

project-reactor - Reactor - Flux requires non filter subscriber

I am using Reactor and am am creating a flux to which I am publishing some events. My issue is that the subscribers that I create with filters fail after a while unless I add a non filter subscriber on the flux.

The code works fine if I an extra subscriber.

Any ideas about what I am doing wrong?

Best regards

0 投票
1 回答
685 浏览

java - 如何使用 Mono.dematerialize()?

作为主题,我不明白用例是什么以及如何使用它。

深入研究源代码似乎是将单声道本身Mono<T>Mono<Signal<X>.

0 投票
1 回答
416 浏览

java - Mono.elapse 不适用于 StepVerifier?

根据其 Javadoc,Mono.elapse()将产生Mono<Tuple2<Long, T>>第一个值是订阅和第一个下一个信号之间经过的时间。

以下测试不起作用

它会抛出异常:

我原以为经过的时间至少为 1000 毫秒,但结果只有 11 毫秒。

我在这里想念什么吗?

0 投票
1 回答
96 浏览

project-reactor - Spring Reactor 和 Netflix RxJava 中的并行运算符

RxJava 库曾经有一个已停产的并行运算符,但是当前的 Spring Reactor 项目有一个并行运算符

是什么阻止了 RxJava 实现并行运算符?

我可以看到有一个名为RxJavaParallel的项目正在尝试执行此操作,但我无法理解为什么以这种方式处理它?当前的 RxJava 实现是否存在固有的设计限制,这使得它变得更加困难?

更新

我希望解决的问题

@akarnokd 指出的这个PR正是我想要的!想知道为什么它一开始就不在那里:)

更新

感谢@akarnokd 的链接和回复,我想正是这种态度让图书馆对像我这样的人来说很简单。如果您在主 RxJava 项目中找不到所需的功能,扩展项目值得一看

0 投票
1 回答
520 浏览

java - 应该单声道.materialize 返回通量>?

根据其 Javadoc Mono<T>.materialize()

将传入的 onNext、onError 和 onComplete 信号转换为 Signal。由于错误被具体化为信号,因此将停止传播并发出 onComplete。完成信号将首先发出 Signal.complete() 然后有效地完成通量。

这意味着当没有发生错误时,Publisher返回的materialize应该至少发出 2 个信号:

  • Signal.next
  • Signal.complete

但是,此方法的返回类型Mono<Signal<T>>仅允许发出单个事件。所以我很困惑。代码应该是

0 投票
1 回答
630 浏览

java - Mono.subscribe(consumer, errorConsumer, completeConsumer, subscriptionConsumer) 不调用消费者和完成消费者?

作为主题,第 4 个变体Mono.subscribe似乎没有调用成功消费者和完整消费者。它只调用订阅消费者。

下面的代码失败了

0 投票
2 回答
1523 浏览

java - 将 io.projectreactor 版本从 2.0.x 升级到 3.0.4 - 使用 Spring 框架

我在尝试升级时遇到问题。

目前我使用的是 2.0.x 版本,特别是 -

我正在使用 Maven,并且我对“projectreactor”有一个依赖项-

升级到 3.0.4.RELEASE 版本时,为了继续使用我之前使用的所有东西,我需要显式导入 -

但我还是不见了

我不知道该怎么做。

0 投票
1 回答
1247 浏览

spring - 项目反应堆处理器 v3.X

我们正在尝试从 2.X 迁移到 3.X。 https://github.com/reactor/reactor-core/issues/375 我们在我们的应用程序中使用了 EventBus 作为事件管理器(低延迟 FX 系统),它非常适合我们。

更改后,我们决定采用每个模块并创建自己的处理器来处理事件。1. 从您的角度来看,这种用法是否正确?因为在当前阶段缺乏文档,并且在审查了所有内容之后,我们真的不知道在这里做什么 2. 我们尝试使用 Flux 以便每隔 X 间隔执行一次操作 例如:市场在 1 秒内到达 1000但我们希望在一秒钟内只处理 4 次更新。升级后我们正在使用:

带有缓冲区的处理器并发送到另一个方法。在这种方法中,我们有 Flux 获取列表并尝试并行工作以完成他的任务。我们遇到了 2 个主要问题: 1. 有时我们收到 Null 事件,我们找不到我们的系统正在发送给我想也许我们错过了使用处理器

在上面的例子中,处理程序中的事件有时会为空。一旦他做了流停止工作,直到我们重新启动服务器(因为只有在重新启动时我们才会创建处理器)

2.我们尝试过并行,但有时一些消息消失了,所以也许我们滥用了框架

这样做的目的是处理器将每 250 毫秒唤醒一次并调用处理程序。处理程序将与 Flux 并行工作,以进行更好和更快的处理。*如果 DoBlockingWork 花费超过 250 毫秒,我无法理解会发生什么行为

更新: EventBus 由我们包装,每个订阅的事件都会抛出包装的事件管理器。现在我们尝试为每个模块创建事件处理器,但它的工作速度非常慢。我们已经将 TopicProcessor 与 ThreadExecutor 一起使用,但仍然很慢.. EventBus 高速完成了同样的工作有人知道吗?顺便说一句,当我尝试使用 DirectProcessor 时,它似乎比 TopicProcessor 工作得更好

0 投票
1 回答
541 浏览

spring - Spring 5 Web Reactive - 关于 DTO 响应正文转换的说明

根据文档

响应正文可以是以下之一:

  • Account — 在不阻塞给定 Account 的情况下进行序列化;意味着一个同步的、非阻塞的控制器方法。

为什么同步控制器方法可以是非阻塞的?听起来很矛盾。

0 投票
2 回答
1180 浏览

project-reactor - 反应堆 3 的“间隔缓冲区”通量?

如何使用现有的 Flux 运算符使 Flux 将传入值返回到多个列表中,并且返回之间的延迟最小?

'间隔缓冲区'通量