问题标签 [project-reactor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2022 浏览

java - 如何正确使用 Reactor Publisher

我无法弄清楚如何使用 Reactor 正确实现发布者/订阅者场景。我有一个可行的解决方案,但实施对我来说似乎不正确:

我的问题是我需要手动实现发布者来注册订阅者并传递事件:

然后,我有一个 WorkQueue 处理器(应该是消费者):

它工作正常,但丑得要命。在这个取自 Spring Guides 的示例中,他们使用 EventBus 将事件从发布者路由到消费者,但是,当我尝试将它与我的处理器链接时,我得到了这个编译器错误:

我在这里迷路了,将发布者与处理器联系起来的最佳方式是什么?你会推荐使用 EventBus 吗?如果是这样,正确的调用是什么?

谢谢!

0 投票
1 回答
487 浏览

java - 使用反应器同时消费一个集合

有时必须在我参与的项目中实施经典的并发生产者 - 消费者解决方案,几乎可以减少从多个线程填充一些集合并由多个消费者使用的集合。简而言之,集合说有界到 10k 个实体,一旦达到缓冲区大小,就会提交消耗这 10k 个实体的工作任务,这样的工作人员有一个限制,说它设置为 10,在最坏的情况下,这意味着我可以拥有到 10 个工人,每个工人消耗 10k 个实体。

我确实必须在这里进行一些锁定,并且对缓冲区溢出进行一些检查(当生产者在所有工作人员忙于处理其块时生成过多数据的情况)因此必须丢弃新事件以避免OOM(不是最好的解决方案,但稳定性是p1 ;))

这些天一直在寻找 reactor 和使用它的方法,而不是降低级别并执行上述所有操作,所以愚蠢的问题是:“reactor 可以用于这个用例吗?” 现在忘记溢出/丢弃..我怎样才能为广播公司实现 N 个消费者?

正在寻找带有缓冲区+线程池调度程序的广播器:

我的意图是让广播公司在达到缓冲区大小时以异步方式执行 log(..),但是看起来它总是在阻塞模式下执行 log(...)。执行 100 一次完成下一个 100 等等.. 我怎样才能让它异步?

谢谢 vyvalyty

0 投票
1 回答
3735 浏览

java - 项目反应堆通量的并发处理

我对整个项目反应器或反应式编程非常陌生,所以我可能做错了什么。我正在努力建立一个执行以下操作的流程:

给定一个类实体:

  1. 从数据库中读取实体 ( ListenableFuture<Entity> readEntity())
  2. 对每个项目执行一些并行异步处理 ( boolean processItem(Map.Entry<String, String> item))
  3. 当所有完成调用 doneProcessing( void doneProcessing(boolean b))

目前我的代码是:

事情有效,但handler::processItem调用不会在所有项目上同时运行。我尝试使用dispatchOnandpublishOnioandasync SchedulerGroup和各种参数,但调用仍然在一个线程上串行运行。我究竟做错了什么?

此外,我相信总体上可以改进上述内容,因此任何建议都会受到赞赏。

谢谢

0 投票
2 回答
1622 浏览

java - Flux.take(x) 或flux.all(..) 不等待所有排放

我有一个fluxIterable8 个元素组成的 ( Flux.fromIterable(..))。对于每个通量发射,我想异步调用一个方法。我尝试了各种方法dispatchOnpublishOn但都没有奏效,最终我解决了map(CompletableFuture.supplyAsync(..), executor)将转换fluxflux<CompletableFuture<Boolean>>.

现在我只想在最后一项完成时继续流程。我尝试了all(..), 和 withtake(size of the Iterable)但在这两种情况下,在所有元素完成之前流程都会继续。我认为这是因为我的执行程序只有 4 个线程,并且需要一些时间才能将CompletableFuture-s 添加到通量中。

为什么不all(..)take(8)等待助焊剂完成?我怎样才能让它等待?

编码:

0 投票
5 回答
25942 浏览

java - 执行 Flux.map() 时如何处理错误

我试图弄清楚在 Flux 中映射元素时如何处理错误。

例如,我将一个 CSV 字符串解析为我的一个业务 POJO:

其中一些行可能包含错误,所以我在日志中得到的是:

我在 API 中阅读了一些错误处理方法,但大多数是指返回“错误值”或使用后备 Flux,如下所示:

但是,使用我的myflux方法意味着再次处理整个助焊剂。

那么,有没有办法在处理特定元素时处理错误(即忽略它们/记录它们)并继续处理其余的通量?

使用@akarnokd 解决方法更新

但是,这很有魅力,因为您可以看到代码没有以前那么优雅了。Flux API 没有任何方法可以执行此代码的操作吗?

0 投票
2 回答
1345 浏览

java - reactor-core - java.lang.IllegalStateException:队列已满?!在 Hot Publisher (ConnectableFlux) 上

到目前为止,我一直在使用 RxJava,但我开始使用 projectreactor.io 中的 reactor-core,因为它符合反应流规范。

在下面的测试中,我创建了一个生成随机数的热通量 (ConnectableFlux)。我立即 connect() 它并预取 256 个值(我可以在日志中看到 258 个)。我等待 5 秒来模拟订阅者要等到一段时间后才会订阅。

主线程唤醒后,RnApp 订阅 ConnectableFlux randomNumberGenerator.subscribe(new RnApp());,. 然后RnApp.onSubscribe()调用并请求 10 个元素。之后,java.lang.IllegalStateException: Queue full?!引发异常(RnApp.onError()被调用),为什么?

订户

发布者测试

日志

0 投票
0 回答
230 浏览

java - 什么是 Reactor (projectreactor.io) 待办事项?

许多Reactor Dispatchers 允许您指定“积压”,例如:

SingleThreadDispatcher(int backlog)

ThreadPoolExecutorDispatcher(int poolSize, int backlog)

我发现一些文档说:

有人可以帮助我理解这意味着什么以及合理的价值可能是什么?

0 投票
0 回答
282 浏览

java - Spring reactor 中的任务协调与同步

我们正在使用 spring reactor 框架,工作是根据任务执行的。有一个主要任务,它会创建多个子任务。当所有子项的执行完成时,我需要使用主任务生成一些事件。我知道 Java 并发构造,它可以使用 countdownlatch 等来实现。但我已经读过,应该使用 Streams 和 Promises 来完成反应器框架的任务协调。

需要一些有用的链接和示例,为我提供有关 Spring reactor 中任务协调的更多信息。

0 投票
0 回答
494 浏览

spring - 了解反应堆的 FluxProcessor.wrap(upstream,downstream)

处理器(RxJava 中的主题)既充当发布者又充当订阅者,因此它们可以订阅发布者,此外,还可以被订阅,以便传递从顶级订阅者获得的值:

如何FluxProcessor.wrap()适应这个模式?例如,我想创建一个从 a 获取值并且可以订阅以获取值的FluxProcessorwith 。FluxProcessor.wrapFlux.range()

0 投票
1 回答
850 浏览

java - spring mvc 与反应流集成

我在 spring mvc 上构建了一个 RESTful api 应用程序。

最近我在 spring mvc 和响应式流(如 rxjava 和 project-reactor)之间进行了一些集成,并尝试使应用程序更具响应性。

我刚刚构建了一些如下所示的演示:

1.对于 rxjava,我使用 PublishSubject

2.用于项目反应堆

在运行这两个演示时,我看不出仅使用 java 的 CompletableFuture 在控制器方法之间提供太多区别。

我所理解的反应流和我想要的是将 servlet 请求视为一个流并将其与诸如背压之类的特性结合起来。

我想知道: 1. 有没有更好的方法让应用程序更具反应性?2.将spring mvc与反应流集成是否正确或兼容?如果是,我怎样才能执行诸如背压之类的功能?

我意识到也许我忘了声明为什么/如何在控制器中返回一个可完成的未来,实际上我注入了一个自定义的 MethodReturnValueHandler 来将 CompletableFuture 转换为 DefferdResult。