问题标签 [project-reactor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
317 浏览

spring - 使用 Spring Reactor 丢失消息

在我的 Spring 应用程序中,我似乎丢失了发送到反应器的消息。我怀疑这是由于在 Spring 容器的生命周期中我开始产生消息的时间。

在我的特定用例中,有一个 Spring Bean 在启动时执行一些初始化。此初始化的一部分涉及生成发送到 Reactor 的消息。

我怀疑 Spring 注册消费者的顺序存在问题,并且初始化代码在消费者注册之前已经运行。如果我在启动后手动运行代码,例如向调用相同代码的控制器发出请求,则消息会将其发送给消费者。

我尝试了各种方法来更改初始化代码运行的生命周期中的时间,例如ApplicationListener<ContextRefreshedEvent>,方法,实现ApplicationListener<ContextStartedEvent>等。这些方法似乎都不起作用。@PostConstructInitializingBean

我的消费者使用@reactor.spring.annotation.Consumer 注释,方法使用@reactor.spring.annotation.Selector 注释。如果相关,我使用Spring Boot 和自动配置的Reactor (@EnableReactor)。

0 投票
1 回答
233 浏览

java - 对 Stream 的终端调用永远不会执行

我很难使用 Spring Reactor Stream API(类似于 rxjava)在我的服务中构造一个响应对象,该对象包装由两个下游服务提供的响应。

以下是accept()我的 Channel Consumer 上的方法。为了保护无辜者,一些名字已被更改。

因此,aFooRequest包装了 many BarRequests,每个都有一个关联的 Classify 请求和一个关联的 Validate 请求。我们想要 1) 转换为 a FooRequest,2) 转换FooRequest为一系列BarRequests, 3) 为每个 运行两个下游请求BarRequest, 4) 将我们所有的对象聚合BarResponse成一个整体响应, 5) 将响应发送回客户端。

我遇到问题的地方是toList()方法,它似乎永远不会执行。每次我尝试一些涉及 a 的事情时,Promise它似乎总是会崩溃,这也不例外。

FooRequestFunctionBarRequestStreamFunction相当简单,似乎运行良好。他们的方法签名是:

和:

DownstreamRequestZipFunction看起来像这样:

只要两个下游请求函数都返回结果,这似乎工作正常。

最后,链式调用结束时的 Consumer 具有以下签名:

它的作用是 await() 响应承诺,然后将所有这些响应聚合到单个 XML 文档中写回通道。我可以说执行永远不会达到这种方法,因为没有任何日志记录会触发。这一切似乎都停止在 .toList() 上。

有谁知道为什么这个设置似乎会执行toList()或之后的任何事情?

编辑:好的,我有更多信息。在为应用程序中的每个线程提供命名约定以使调试更容易之后,我可以看到“shared-1”,运行 accept() 方法的线程进入 WAITING 状态,然后停留在那里。这可能与底层 Dispatcher 是一个单线程的环形缓冲区调度程序这一事实有关。

我修改了代码,使方法略有不同,并使用了多线程调度程序,并避免使用 a Promise,但我仍然有一个状态,即链式调用集的尾部不会执行。见下文:

在上面,我将 toList() 替换为对 reduce() 的调用,并将所有内容折叠成一个List<BarResponse>. 我可以看到这个执行和记录很好。但是,无论我对最后一次调用做什么,在尝试了 consume()、consumeOn() 等之后 - 它永远不会执行,也永远不会记录您在上面看到的最终调用。

在 VisualVM 中,我可以看到调度程序线程都在与阻塞队列关联的同一个对象监视器上等待 - 换句话说,它们都在等待工作到达。就像 tail consumeOn() 调用被完全忽略一样。

我在这里做错了什么?我不明白什么?

编辑 2:鉴于 Johns 在下面的回复,我怀疑问题出在服务器设置上。可能仅适用于 reactor 版本 2.0.0.M2,在主Application类中配置如下:

没有为此配置调度程序,它似乎在后台使用 LMAX 破坏器,而不是NettyEventLoopDispatcher. 目前尚不清楚如何设置NettyEventLoopDispatcher和使用它作为替代调度程序。

0 投票
1 回答
787 浏览

spring-integration - 反应堆StringUtils上的Spring集成xml文件错误

我有一个 spring 集成 sftp 流,我将其作为子上下文加载到我的整个应用程序上下文中。这是基于动态 ftp SI 示例。我的集成流程与反应器或其中的流无关。它是一个简单的流程,其中一个direct channel连接sftp-outbound-gateway到将文件传输到 sftp 服务器。我什至可以运行单元测试并且流程工作正常(能够传输文件)但是当我运行一个加载完整父应用程序的集成测试然后使用其中加载的这个 sftp 流程初始化子上下文时,它会抛出一个错误无法找到 reactor/StringUtils 类。

原因似乎是 spring-integration-sftp 将 reactor jar 作为瞬态 deps 加载,但由于我的父应用程序已经在类路径中加载了不同版本的 reactor,我从 spring 集成 dep 中排除了 reactor-core。如果我不从 spring-integration 中排除 reactor-core,那么会有一些版本冲突,所以我想排除它。

初始化 SI 流程

我运行集成测试时的错误

org.springframework.beans.factory.BeanDefinitionStoreException:从类路径资源 [adapters/sftp.xml] 解析 XML 文档时出现意外异常;嵌套异常是 java.lang.NoClassDefFoundError: reactor/util/StringUtils at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:414) at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions( XmlBeanDefinitionReader.java:336) at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:304) at org.springframework.beans.factory.support.AbstractBeanDefinitionReader.loadBeanDefinitions(AbstractBeanDefinitionReader.java:181)

最后是 SI 流程


更新以添加我的反应器配置

0 投票
1 回答
110 浏览

spring - spring 将 sftp 与反应器集成

这个问题是我之前发布的问题的延续。我想创建一个 github 项目来确认行为并可能找出某种解决方案。这是github项目

reactor 2.x 版本中似乎包含了对 reactor 的 Spring 支持,并且 spring 集成 sftp 使用了 reactor 的 1.1.4 版本并引用了一个 StringUtils 类,该类已在 reactor 2+ 版本中移动到不同的包中。

似乎这个版本冲突在 SI 4.2 中解决之前是不可避免的,所以我希望得到一些关于如何使用 reactor 1.1.4 并重新创建与 spring reactor 支持相同/相似的反应器配置的建议(在这个项目)。这样 SI sftp 也可以工作。

0 投票
1 回答
2314 浏览

spring - 反应堆消费者的多线程

我正在使用反应器在整个应用程序中发布事件,并让不同的消费者响应他们的事件。

这是我的反应器配置

我期望使用默认的基于环形缓冲区的调度程序,并且应该并行处理发送给单个消费者的多条消息。相反,它似乎是以同步方式处理事件。线程shared-1用于处理我的event1到consumer1,然后只有在完成event1的处理后,同一个线程才开始在consumer1上处理event2。

如何以一种我应该能够将多个事件发送给多个消费者并且所有事件都并行处理的方式来实现并行处理。

我将不胜感激任何建议。

这就是我将事件分派到事件总线的方式

这是其中一位消费者

0 投票
1 回答
464 浏览

java - 反应堆线程池调度程序

我浏览了反应堆文档,但我不清楚如何控制在任何给定时间调度的事件数量。我想做的是发送一堆事件,足以让一些消费者忙碌,但有办法知道我目前有足够的消费者在工作,所以我不应该再发送任何事件。我在想如果我将调度程序设置为使用线程池,那么在某些时候它不会分配更多线程并可能抛出拒绝异常。这样,我可以继续调度事件,直到收到线程拒绝异常。

有什么方法可以做到这一点,或者我是否以不应该使用的方式使用反应器。

0 投票
1 回答
566 浏览

spring - 增加 Spring reactor AsyncTaskExecutor 中的线程数

如何在使用线程池时增加线程数我的代码如下,

添加以下代码并没有增加线程数,默认固定为机器上的核心数。

如何为我的反应器变量设置线程数?

如果我删除createReactorbean,反应器工作正常,只是默认为RingBuffer单线程。使用该 bean 和 的规范THREAD_POOL,将启动与机器上的内核数相等的线程。我只是想看看如何手动增加该计数...

谢谢

0 投票
2 回答
7090 浏览

java - Reactor Spring 处理异常

我正在使用 Reactor 2 和 Spring 4。这是我拥有的典型代码 -Consumer使用存储库

然后我有一个控制器将请求传递给eventBus我传递请求并返回一个Promise

一切正常,但在ApplicationService抛出异常Promise的情况下,未设置值,但我确实在控制台中得到以下信息:

问题是:

  1. 我是否eventBus以错误的方式使用 Reactor?如果是这样,正确的方法是什么

  2. 也许这个功能还没有实现

0 投票
2 回答
896 浏览

java - 使用 Reactor 2.0 在 Spring 4 上执行多线程

我正在尝试将Reactor 2x集成到现有Spring 4应用程序中,以在执行REST请求期间提高性能,其中resources可以相互独立地获取,类似于 map-reduce,我们将作业并行化为多个线程,然后将它们加入缓冲区.

到目前为止,我们有这个示例在非弹簧环境中工作:

在上面的示例中,我们使用 应用转换resourceToMapFunction,然后使用buffer()方法连接 ,创建Promise等待结果并返回result.

我的第一个问题,这是应该使用 Reactor 的方式吗?我知道这些转换是正确应用的,但也许,我是一个新手,Reactor没有以正确的方式使用某些东西。

我的第二个问题,没什么大不了的,但是Reactor项目中是否有任何东西可以按照输入中提供的相同顺序返回resources?由于这是在多个线程中执行的,所以我很确定答案不是,就像我说的那样,我的担心更少,但无论如何都想问。

最后一个问题,当我将此代码引入我的Spring项目时,转换失败,因为Bean应用底层转换的依赖项不在执行线程中,这是我可以使用 Spring Reactor 版本轻松完成的事情吗?如果是这样,是否有任何链接或文档显示如何操作?

非常感谢!

何塞·路易斯

0 投票
0 回答
258 浏览

java - Reactor 2.0 EventBus - 如何聚合和暂停以处理回复

对于如何使用 EventBus 在 Reactor (2.X) 中完成此用例的任何建议,我将不胜感激

主线程使用 threadPoolExecutor 将事件分派到主题“foo”。该事件可能会发送给一些订阅了“foo”的消费者。这些消费者检查事件数据并确定是否存在问题。

我希望主线程暂停并等待满足其中一个条件:

  1. 所有消费者都收到事件,完成运行并且没有发现任何问题。主线程恢复执行
  2. 一位消费者发现问题,主线程应该抛出异常
  3. 其中一个消费者运行时间过长,主线程应恢复执行但记录警告

所以像