问题标签 [project-reactor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
117 浏览

reactor - 遮光反应器

我正在尝试遮蔽反应堆(https://github.com/reactor/reactor)并且在启动事件总线时遇到了问题。我收到以下错误。非常感谢任何有关对该项目进行着色的建议

0 投票
1 回答
285 浏览

java - What's a Listener's subject in Reactor?

I've a simple Listener pattern implementation which (in a condensed form) looks like this:

So in essence this grabs email from my inbox, extracts information from each email and converts that into an internal message format.

A typical Consumer would then persist that message, another would display the information on a HTML page, and yet another might trigger 3rd party systems to do some action. All of this works fine.

Now, I'm trying to get some more resiliency in face of a Consumer failure. For example, I want my EmailMessage to live on if the persistence consumer failed (threw an exception), ending processing. In that case, I want processing to be retried after a pause interval. Right now, if a Consumer fails, the message would be lost.

I recently found out about Reactor, and I think it's orchestration would save the message and enable me to do what I need. However, I fail to see how I would adapt my current Runnable into it's model. From the documentation it seems like I need a RingBufferProcessor to hold my messages. I can't find another way then to pass the processor to my Runnable for it call the onNext() method instead of the separate listener's accept method(s). Am I missing something or is this the way it's supposed to work?

(Extra cookies for showing me how I retry in the face of consumer failure)

0 投票
1 回答
107 浏览

reactor - project-reactor - 使用故障安全集成测试加载 UUIDUtils 时的 Nullpointer

在运行具有故障保护的反应堆 EventBus 的集成测试时,我遇到了 NP

我认为这是因为 UUIDUtils 类正在由引导类加载器加载,因此对 getClassLoader() 的调用返回 null? Caused by: java.lang.NullPointerException at reactor.core.support.UUIDUtils.<clinit>(UUIDUtils.java:39)

IS_THREADLOCALRANDOM_AVAILABLE = null != UUIDUtils.class.getClassLoader().loadClass( "java.util.concurrent.ThreadLocalRandom" );

0 投票
0 回答
290 浏览

java - 如何为分布式服务配置 Reactor?

我在专用网络上分布了许多服务,并且有一项服务必须向所有其他服务广播消息(例如身份验证令牌的到期)。我的应用程序基于 java Spring 框架,自从我开始熟悉满足我条件的架构后,我发现它被称为事件驱动架构,与该架构相关的两个著名框架是 AKKA 和 Reactor。但是阅读 reactor 教程(与 spring 集成比 AKKA 好得多)我发现包括 Producer 和 Consumer 在内的所有服务都必须提交到同一上下文中的 Reactor 类中。现在我想知道是否有一种方法可以在多台机器上分发服务。

0 投票
2 回答
3886 浏览

java - Reactor 中 `groupBy` 组的并行调度

我正在学习Reactor,我想知道如何实现某种行为。假设我有一个传入消息流。每条消息都与某个实体相关联并包含一些数据。

与不同实体相关的消息可以并行处理。但是,与任何单个实体有关的消息必须一次处理一个,即实体的消息 2 的处理要"abc"等到实体的消息 1 的处理完成后才能开始"abc"。在处理消息时,应该缓冲该实体的进一步消息。其他实体的消息可以畅通无阻地进行。可以将其想象为每个实体都在线程上运行这样的代码:

如何在不阻塞的情况下使用 React 实现这一点?总消息率可能很高,但每个实体的消息率会非常低。实体集可能非常大,不一定事先知道。

我想它可能看起来像这样,但我不知道。

0 投票
0 回答
26 浏览

java - 当一个事件被触发时,它使用的内存会发生什么?没有注册处理程序?

Reactor-EventBus 事件,尤其是内存本身及其数据负载在它被触发并且没有注册处理程序时会发生什么?

会被处理掉吗?或者那些 Event 对象会被收集到 ringbuffer 中吗?

我的猜测是它将被处置,但我想确定这一点。

0 投票
0 回答
717 浏览

java - 弹簧集成与反应流

我正在做一个 ETL 项目。我已经使用弹簧集成很长时间了。数据源当前是文件或编年史,但它可能会更改为实时流,并且数量可能会增长。未来有可能转向大数据解决方案(hadoop、spark 等)。

基于此,我需要在 spring 集成和反应流之间进行比较?为什么有人会使用一个而不是另一个(或者我首先试图比较两者是错的)?您认为它们可以一起使用的场景(如果有的话)?

0 投票
0 回答
464 浏览

spring - Spring、HandleAfterCreate、Reactor、EventBus IllegalArgumentException

我想在捕获事件后生成一个单独的线程(使用Reactor或) 。我正面临着,到底发生了什么?我进行了很多搜索,发现调度程序不允许复制或存储已调度的对象。这是什么意思?请帮忙。EventBus@HandleAfterCreateSpringBootApplicationIllegalStateException

异常消息

我的环境

  • org.springframework.boot:spring-boot-starter-data-rest
  • spring-boot-starter-data-jpa:1.2.6
  • com.h2 数据库:h2:1.4.189
  • org.projectreactor:reactor-spring:1.0.0
0 投票
1 回答
77 浏览

java - Java Streams 批处理组合调用

我今天的问题是关于构建微服务时的组合操作。

让我们使用虚构的场景:我想构建一个仪表板。仪表板由一群人和他们的信息(历史、评论、购买、最后搜索的产品)组成。

阅读spring-cloud和spring-reactor,我想要一个调用多个微服务的非阻塞解决方案:用户服务,评论服务,搜索引擎服务,......

我的第一个猜测是做类似的事情

  • 加载用户,
  • 然后为每个人加载其评论
  • 然后加载它的历史
  • 合并所有数据

在伪代码中类似于loadUsers().flatmap(u -> loadReviews(u))....reduce(). 正如你所看到的,这里真的很近似。

当加载 1 个用户时,我们可以估计还需要 4 个 http 调用。对于 100 个用户,400 个额外的呼叫等。Big-O 似乎不是线性的。

在最坏的情况下,微服务还委托来自 XYZ 微服务的数据加载,那么我们得到:对于 1 个用户 -> N 个调用,包括 1 个审查调用 -> 1 个 XYZ 调用。抱歉,我没有计算 Big-O(二次?)。

为了避免这种情况,我们也许可以加载所有用户,提取他们的 id,用一批 id 调用每个微服务。每个微服务可以一次加载所有数据(可能是由 id 映射的评论列表),原始调用将合并所有这些列表。(一种zip功能)

摘要:我刚刚阅读了这个关于 Observables 组合的问题。我的问题可以概括为“当您在链的开始没有唯一用户但有数百名用户时,您是否使用相同的策略?” (性能可能不是问题吗?)

0 投票
1 回答
196 浏览

java - Spring XD 流中的异常处理

如何创建一个故障安全的 Spring XD 流,它会在一条特定消息触发异常后继续正常运行(即记录错误但继续使用流中的下一条消息),而无需在每个消息中添加 try catch(Throwable)流步骤?

使用 Reactor 或 RxJava 模型有什么简单的方法吗?

使用 Reactor 的示例流: