问题标签 [project-reactor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
401 浏览

spring - Spring 5 M2 - 反应式 - 序列化 ServerSentEvent 时出错

从 Spring 5.0.0 M1 升级到 M2 后,此代码已停止工作(我从更改SseEventServerSentEvent类):

例外是:

0 投票
1 回答
834 浏览

project-reactor - 根据条件拆分事件并按顺序处理

有以下问题:给定具有 partitionId 属性的事件列表(例如 0-10),我希望根据 paritionId 拆分传入事件,以便按接收顺序处理具有相同 partitionId 的事件. 如果分布或多或少均匀,这将导致并行处理 10 个事件(每个分区)。

除了创建 10 个单线程调度器并将事件发送到正确的调度器之外,还有没有办法使用 Project Reactor 完成上述任务?

谢谢。

0 投票
1 回答
4351 浏览

spring - Spring 5 Web Reactive - 热发布 - 如何使用 EmitterProcessor 将 MessageListener 桥接到事件流

示例项目位于此处:https ://github.com/codependent/spring5-playground

我想将从 JMS 队列接收到的消息桥接到响应式控制器中,该控制器会将消息作为事件流发布。

我不希望重播消息,也就是说,如果消息到达并且没有任何订阅者,我不希望在任何订阅者稍后发送它们,所以我使用的是 EmitterProcessor:

这是我的消息监听器:

最后是我的休息控制器:

为了测试它的行为,我添加了这个控制器方法来模拟插入队列:

启动应用程序后立即加载http://localhost:8080/accounts/1/alerts/live2,浏览器等待数据。

然后我发布一些消息http://localhost:8080/mock/accounts/1/alerts/put

但是没有人进入浏览器。这最终以 500 错误(无日志)结束。

经过一些手动重试后,它开始接收数据......

...但很多时候它没有得到任何东西。

0 投票
1 回答
840 浏览

java - 返回响应或错误的 Reactor Eventbus 示例

首先,我对异步处理领域很陌生。在我当前的项目中,我们使用 spring boot 和项目反应器,特别是 Eventbus,来做一些异步处理。我想使用 eventbus 也会使我们的系统更具可扩展性。

到目前为止,EventBus 的使用非常有限,我们在 EventBus 消费者中进行一些处理,但不返回任何内容。配置和示例处理器如下:

//配置文件

//消费者

到目前为止,这对于具有 void 返回类型的接受方法来说还不错。但是,现在我有一个场景,我想从处理器方法返回响应,或者如果在处理时发生错误需要抛出适当的异常,并且在任何一种情况下,都需要将响应/异常返回到调用。

这可以使用反应器完成吗?如果是,请为此提供一个简单的示例。我已阅读有关 Promise 的信息,但找不到与我的案例类似的示例。

0 投票
3 回答
2330 浏览

spring - RxJava 是否有任何异步最新的 http 客户端?

Reactor-core 和 Spring 5 刚刚为其 Mono/Flux Publishers 引入了新的异步 WebClient。

由于使用 Observable 包装 RestTemplate 请求并 subscribeOn 以提供异步性是很常见的,所以我想知道 RxJava 是否有任何异步客户端。

我找到了https://github.com/ReactiveX/RxApacheHttp但它似乎不受支持,它的最后一次提交是从 2014 年开始的,甚至在我的测试中都不起作用。

0 投票
1 回答
1271 浏览

java - Spring Reactor 与事务性;处理故障和重启作业

我们的 Spring Web 应用程序使用带有 Quartz 的 Spring Batch 来执行复杂的工作。这些作业中的大多数都在事务范围内运行,因为如果复杂系统的某个部分发生故障,我们希望回滚任何以前的数据库工作。然后,我们将调查问题、部署修复程序并重新启动服务器。

这将成为一个问题,因为其中一些作业需要进行大量处理并且可能需要很长时间才能运行。随着执行时间开始超过 1 小时,我们发现自己无法为生产环境部署其他问题的修复程序,因为我们不想中断一项重要的工作。

我一直在阅读 Reactor 实现作为我们问题的解决方案。我们可以做一点处理,发布一个事件,并让其他系统根据需要执行适当的操作。甜的!

我唯一的问题是,处理失败的最佳方法是什么?如果我发布了一个事件,而消费者未能执行某些关键功能,它会在稍后重新启动吗?

如果发布了一个事件,并且在所有侦听它的适当消费者可以适当地处理它之前,服务器关闭以进行部署怎么办?

0 投票
2 回答
472 浏览

rx-java - 使用 RxJava 进行基于内容的路由的惯用方式

有没有人知道在 RxJava 或 Reactive Stream 中进行基于内容的路由的惯用方式?

我可以看到如何使用地图过滤器和转换等标准运算符创建反应式消息处理管道。

但我看不出一个惯用的为什么使用运算符在几个可能的管道之间切换。
我希望能够根据一个初始输入对象或映射触发多个处理管道中的一个,以便根据输入映射或对象的内容选择不同的管道。

这个想法是我需要向外部 api 发出几个可能的请求,而决定要触发的 api 和管道的因素取决于我的控制器的输入对象。

我知道这可以通过 Spring 集成或 Camel 来完成,但我的用例不需要集成引擎的全部灵活性,因为它能够处理网络上的延迟。

非常感谢你

0 投票
1 回答
1976 浏览

java - 如何使用 Reactor 处理来自 API 的大型 JSON 响应?

API 使用JSON 对象进行响应,该对象具有一组对象,例如{"items":[{...},{...},...],...我们需要处理的数据。问题是该数组有 + 1.000 个对象,并且端点响应缓慢,因此流式传输需要很多秒才能完成,并且使用我们的“阻塞”编程模型使用Apache HttpClient来使用端点并使用Jackson来序列化响应,我们等待获取完整的主体来解析包含数组的所有对象,而不是在每个元素到来时处理它,节省内存使用时间,因为一旦处理了一个元素,我们可以让 GC 来释放使用的内存由元素。

我如何在数据到来时使用Reactor 项目来消耗响应?更改我们正在使用的 HTTP 客户端和/或序列化程序没有问题。此外,我们将 Java 8 与 Spring MVC 与 Spring 4.3 一起使用,但我们可以将项目移至即将发布的 Spring 版本。

注意:端点是第三方 API,是对 HTTP 1.1 服务器的简单 GET,我们无法更改它的工作方式,例如添加分页或类似的东西。

0 投票
1 回答
58 浏览

websocket - 带有嵌入式 tomcat 反应器错误的 stomp-chat Web 套接字 - 未找到类型 reactor.tuple.Tuple

我在使用带有嵌入式 Tomcat 服务器的 Spring 时让 Web 套接字订阅正常工作时遇到问题。

运行此stomp-chat 应用程序时,我收到以下错误:

我尝试添加以下依赖项:

但它给出了同样的错误。任何帮助,将不胜感激。

0 投票
1 回答
855 浏览

java - Netty HttpServer api 更改/与可用示例不同

Arjen Poutsma 的博客文章Josh Long 的视频示例中的Netty 服务器实例化是通过创建一个reactor.ipc.netty.http.HttpServer实例然后以实例作为参数调用它startstartAndAwait方法来完成的。ReactorHttpHandlerAdapter

然而,API 似乎现在已经发生了变化startstartAndAwait方法现在需要一个具有以下签名的 lambda:

项目依赖项及其版本与 Arjen Poutsma 的示例项目中的相同

使用 spring reactor 支持实例化 netty 服务器的新/正确方法是什么?