问题标签 [project-reactor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
759 浏览

spring - 带有弹簧反应器项目的反应流背压

我已经研究并阅读了他们的文件不是很容易理解。我想要实现的是以下功能:

我正在使用 Spring Reactor 项目并使用 eventBus。我的事件总线正在向模块 A 抛出事件。

模块 A 应该接收事件并插入到将保存唯一值的 Hot Stream。每 250 Milisecons 流应该提取所有值并对它们进行计算......等等。

例如: eventBus 正在抛出事件,编号为:1,2,3,2,3,2

流应获取并保存唯一值 -> 1,2,3 250 毫秒后,流应打印数字和空值

有人知道如何开始吗?我尝试了这些示例,但没有任何效果,我想我不明白。有人有例子吗?

肿瘤坏死因子

编辑:

当试图做下一个我总是得到异常:

例外:

如您所见,如果不通过此问题,我将无法继续尝试。

顺便说一句:这只有在我使用时才会发生,例如buffer(1, TimeUnit.SECONDS)如果我使用buffer(50)它就可以了。虽然这不是最终的解决方案,但它是一个开始。

0 投票
1 回答
42 浏览

spring - 使用每个消费的唯一值消费流

我已将其定义为全局:

在构造函数上:

现在我两次调用这个函数:

独特的作品很好,我第一次消费。当我再次调用此方法时,他仍然记得不同的并且不会触发消耗。

每次我们消费后是否有任何清洁选项。我需要实现的想法是,我将缓冲所有输入,并且每次我只会消耗唯一的项目..

有人有什么想法吗?

肿瘤坏死因子

0 投票
2 回答
4622 浏览

java - Project Reactor - 如何按窗口处理结果

我有以下通量:

我想在doOnNext这一刻我应该能够获得物品块。但.get总是以异常失败Timeout on Mono blocking read

我从日志中看到,之前都已处理过.window,结果日志给出:

官方文档中没有太多信息,所以我想我误解了一些东西,并且window错误地使用了函数。但是我到底做错了什么?

更新

我发现如果我改用这个特殊问题可以避免.doOnSuccess。喜欢:

但真正的问题是,在我的情况下,我需要根据对提供的数据(而不是1)的一些计算返回一个数字。我可以在这里创建一个新Mono的,但无论如何,以后我应该.get它。例如在 final 中.reduce。因此,如果我这样做.reduce(0, (a, b) -> a + b.get()),那么它将在那里失败。

我如何安全地从 Mono 中获得价值?

更新 2

现在我已经删除了 Flux:toList 并自己完成,Mono从窗口后的映射阶段返回。这大概就是应该的样子。

但它无论如何都不起作用,卡在.reduce

请注意,如果我删除.reduce步骤,它会起作用。在这种情况下,Flux提供者的处理在主流程之后.window执行。我无法控制它,甚至无法得到最终结果。这没有任何意义。

0 投票
2 回答
11106 浏览

spring - Spring 5 Reactive 在扩展 Flux/实现 Publisher 并多次调用 s.onNext() 时失败

我刚开始使用新的 Spring 5 响应式支持并想模拟一些异步数据生成,注意到两个错误行为:

1) 多次调用 s.onNext(String):

在这种情况下,堆栈跟踪是:

2) 多次调用 s.onNext(Alert.class -any DTO-):

现在它不会在日志上显示任何错误,但调用者会收到 500 响应代码和内容 '['

日志:

为什么我们不能多次调用 onNext() ?我们怎么能这样做?

注意:我只要调用onNext一次就可以了:

或者

0 投票
1 回答
2580 浏览

spring - Web 反应式编程 - 从 HTTP 客户端的角度来看有什么优势?

让我们假设控制器的这两种情况会延迟生成一些随机数:

1) 反应式 Spring 5 反应式应用程序:

2)传统的Spring MVC DeferredResult

从 HTTP 客户端(浏览器、AJAX 请求)的角度来看,两种方案之间没有任何区别。我的意思是客户端将等到所有结果都发送完毕,并且在提交整个响应之前不会处理它们。

也就是说,尽管 spring web reactive 让我们认为它正在将结果发送回来,因为它们正在生成,但实际上它不会发生这种情况,并且客户端将无法处理结果,直到所有数字都有已生成。

使客户端完全响应的直接方法是使用 WebSockets。

那么,除了很酷的东西(比如漂亮的语义、组合......),考虑到浏览器 HTTP 请求不是响应式的并且等效于使用传统的 DeferredResult,使用 Spring Web Reactive 的意义何在?

0 投票
2 回答
2110 浏览

spring - Spring 5 Web Reactive Programming - 从流数据的 Spring Reactive Controller 解组 JSON 时出现 WebClient ClassCastException

这个问题与这个问题有关我在其中询问了如何从 Reactive Spring Controller 流式传输数据。

正如罗森指出的那样,我们必须使用text/event-stream将流式传输的结果作为服务器发送的事件发回,到目前为止一切都很好。

我有这样的服务:

从浏览器调用它,开始接收 3 个结果,延迟 1 秒。

我想从 WebClient 调用此服务并以这种方式实现它:

这是测试代码:

问题是,当客户端尝试提取结果时,没有一个HttpMessageReader's 可以读取text/event-stream+ Alert.class

例外:

0 投票
1 回答
1470 浏览

spring - 如何配置 reactor-netty 以使用 SSL?

我正在尝试熟悉 Spring 的 Project Reactor ( https://projectreactor.io/ ) 并构建了一个小型应用程序来通过 SSL 对另一个服务进行 REST 调用。我找不到任何方法来配置org.springframework.web.client.reactive.WebClient通过 SSL 发出请求。似乎没有关于此的文档。我正在使用reactor-core3.0.0.RC1 和reactor-netty0.5.0.M3,以及 Spring Framework 5.0.0.M1。有谁知道如何配置reactor-nettySSL 支持?

0 投票
1 回答
420 浏览

project-reactor - 序列化反应堆中的通量

可以序列化 Reactor Flux。例如,我的 Flux 处于某种状态,当前正在处理某个事件。突然服务终止。Flux 的当前状态保存到数据库或文件中。然后在重新启动应用程序时,我只需从该文件/表中获取所有 Flux 并订阅它们以从上一个状态重新启动处理。这在反应堆中可能吗?

0 投票
1 回答
9539 浏览

java - 取消订阅的反应堆方式

我试图弄清楚反应堆项目,我正在寻找一种取消订阅的方法。我知道在进行例如 Flux 的订阅后,我可以获得对可用于发送 onCancel 信号的 Cancellation 对象的引用,但这只是在进行订阅之后,我需要在某种集合中保存该引用。

有没有更好的方法来获取 Cancellation 对象?或者只是取消订阅。也许某种包含对所有活动订阅的引用的地方 - 是的,那会很棒......

0 投票
2 回答
72 浏览

spring - 异步保存实体时未持久化的实体

我有以下代码:

控制器:

事件通知器:

事件消费者:

事实证明,该方法saveMessage从未被调用。我看到它在启动服务器过程中被调用过一次。

我不确定这是与反应堆相关还是与弹簧相关。

编辑:我将保存过程移至服务,没有任何改变

edit2:我记录了服务操作以查看发生了什么:

控制台中的输出是:

没有来自 JPA 的日志,什么都没有!