问题标签 [project-reactor]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
project-reactor - Reactor - Flux requires non filter subscriber
I am using Reactor and am am creating a flux to which I am publishing some events. My issue is that the subscribers that I create with filters fail after a while unless I add a non filter subscriber on the flux.
The code works fine if I an extra subscriber.
Any ideas about what I am doing wrong?
Best regards
java - 如何使用 Mono.dematerialize()?
作为主题,我不明白用例是什么以及如何使用它。
深入研究源代码似乎是将单声道本身Mono<T>
从Mono<Signal<X>
.
java - Mono.elapse 不适用于 StepVerifier?
根据其 Javadoc,Mono.elapse()
将产生Mono<Tuple2<Long, T>>
第一个值是订阅和第一个下一个信号之间经过的时间。
以下测试不起作用
它会抛出异常:
我原以为经过的时间至少为 1000 毫秒,但结果只有 11 毫秒。
我在这里想念什么吗?
project-reactor - Spring Reactor 和 Netflix RxJava 中的并行运算符
RxJava 库曾经有一个已停产的并行运算符,但是当前的 Spring Reactor 项目有一个并行运算符
是什么阻止了 RxJava 实现并行运算符?
我可以看到有一个名为RxJavaParallel的项目正在尝试执行此操作,但我无法理解为什么以这种方式处理它?当前的 RxJava 实现是否存在固有的设计限制,这使得它变得更加困难?
更新
我希望解决的问题
@akarnokd 指出的这个PR正是我想要的!想知道为什么它一开始就不在那里:)
更新
感谢@akarnokd 的链接和回复,我想正是这种态度让图书馆对像我这样的人来说很简单。如果您在主 RxJava 项目中找不到所需的功能,扩展项目值得一看
java - 应该单声道.materialize 返回通量>?
根据其 Javadoc Mono<T>.materialize()
:
将传入的 onNext、onError 和 onComplete 信号转换为 Signal。由于错误被具体化为信号,因此将停止传播并发出 onComplete。完成信号将首先发出 Signal.complete() 然后有效地完成通量。
这意味着当没有发生错误时,Publisher
返回的materialize
应该至少发出 2 个信号:
Signal.next
Signal.complete
但是,此方法的返回类型Mono<Signal<T>>
仅允许发出单个事件。所以我很困惑。代码应该是
java - Mono.subscribe(consumer, errorConsumer, completeConsumer, subscriptionConsumer) 不调用消费者和完成消费者?
作为主题,第 4 个变体Mono.subscribe
似乎没有调用成功消费者和完整消费者。它只调用订阅消费者。
下面的代码失败了
java - 将 io.projectreactor 版本从 2.0.x 升级到 3.0.4 - 使用 Spring 框架
我在尝试升级时遇到问题。
目前我使用的是 2.0.x 版本,特别是 -
我正在使用 Maven,并且我对“projectreactor”有一个依赖项-
升级到 3.0.4.RELEASE 版本时,为了继续使用我之前使用的所有东西,我需要显式导入 -
和
但我还是不见了
我不知道该怎么做。
spring - 项目反应堆处理器 v3.X
我们正在尝试从 2.X 迁移到 3.X。 https://github.com/reactor/reactor-core/issues/375 我们在我们的应用程序中使用了 EventBus 作为事件管理器(低延迟 FX 系统),它非常适合我们。
更改后,我们决定采用每个模块并创建自己的处理器来处理事件。1. 从您的角度来看,这种用法是否正确?因为在当前阶段缺乏文档,并且在审查了所有内容之后,我们真的不知道在这里做什么 2. 我们尝试使用 Flux 以便每隔 X 间隔执行一次操作 例如:市场在 1 秒内到达 1000但我们希望在一秒钟内只处理 4 次更新。升级后我们正在使用:
带有缓冲区的处理器并发送到另一个方法。在这种方法中,我们有 Flux 获取列表并尝试并行工作以完成他的任务。我们遇到了 2 个主要问题: 1. 有时我们收到 Null 事件,我们找不到我们的系统正在发送给我想也许我们错过了使用处理器
在上面的例子中,处理程序中的事件有时会为空。一旦他做了流停止工作,直到我们重新启动服务器(因为只有在重新启动时我们才会创建处理器)
2.我们尝试过并行,但有时一些消息消失了,所以也许我们滥用了框架
这样做的目的是处理器将每 250 毫秒唤醒一次并调用处理程序。处理程序将与 Flux 并行工作,以进行更好和更快的处理。*如果 DoBlockingWork 花费超过 250 毫秒,我无法理解会发生什么行为
更新: EventBus 由我们包装,每个订阅的事件都会抛出包装的事件管理器。现在我们尝试为每个模块创建事件处理器,但它的工作速度非常慢。我们已经将 TopicProcessor 与 ThreadExecutor 一起使用,但仍然很慢.. EventBus 高速完成了同样的工作有人知道吗?顺便说一句,当我尝试使用 DirectProcessor 时,它似乎比 TopicProcessor 工作得更好