问题标签 [project-reactor]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring - 带有弹簧反应器项目的反应流背压
我已经研究并阅读了他们的文件不是很容易理解。我想要实现的是以下功能:
我正在使用 Spring Reactor 项目并使用 eventBus。我的事件总线正在向模块 A 抛出事件。
模块 A 应该接收事件并插入到将保存唯一值的 Hot Stream。每 250 Milisecons 流应该提取所有值并对它们进行计算......等等。
例如: eventBus 正在抛出事件,编号为:1,2,3,2,3,2
流应获取并保存唯一值 -> 1,2,3 250 毫秒后,流应打印数字和空值
有人知道如何开始吗?我尝试了这些示例,但没有任何效果,我想我不明白。有人有例子吗?
肿瘤坏死因子
编辑:
当试图做下一个我总是得到异常:
例外:
如您所见,如果不通过此问题,我将无法继续尝试。
顺便说一句:这只有在我使用时才会发生,例如buffer(1, TimeUnit.SECONDS)
如果我使用buffer(50)
它就可以了。虽然这不是最终的解决方案,但它是一个开始。
spring - 使用每个消费的唯一值消费流
我已将其定义为全局:
在构造函数上:
现在我两次调用这个函数:
独特的作品很好,我第一次消费。当我再次调用此方法时,他仍然记得不同的并且不会触发消耗。
每次我们消费后是否有任何清洁选项。我需要实现的想法是,我将缓冲所有输入,并且每次我只会消耗唯一的项目..
有人有什么想法吗?
肿瘤坏死因子
java - Project Reactor - 如何按窗口处理结果
我有以下通量:
我想在doOnNext
这一刻我应该能够获得物品块。但.get
总是以异常失败Timeout on Mono blocking read
。
我从日志中看到,之前都已处理过.window
,结果日志给出:
官方文档中没有太多信息,所以我想我误解了一些东西,并且window
错误地使用了函数。但是我到底做错了什么?
更新
我发现如果我改用这个特殊问题可以避免.doOnSuccess
。喜欢:
但真正的问题是,在我的情况下,我需要根据对提供的数据(而不是1
)的一些计算返回一个数字。我可以在这里创建一个新Mono
的,但无论如何,以后我应该.get
它。例如在 final 中.reduce
。因此,如果我这样做.reduce(0, (a, b) -> a + b.get())
,那么它将在那里失败。
我如何安全地从 Mono 中获得价值?
更新 2
现在我已经删除了 Flux:toList 并自己完成,Mono
从窗口后的映射阶段返回。这大概就是应该的样子。
但它无论如何都不起作用,卡在.reduce
:
请注意,如果我删除.reduce
步骤,它会起作用。在这种情况下,Flux
提供者的处理在主流程之后.window
执行。我无法控制它,甚至无法得到最终结果。这没有任何意义。
spring - Spring 5 Reactive 在扩展 Flux/实现 Publisher 并多次调用 s.onNext() 时失败
我刚开始使用新的 Spring 5 响应式支持并想模拟一些异步数据生成,注意到两个错误行为:
1) 多次调用 s.onNext(String):
在这种情况下,堆栈跟踪是:
2) 多次调用 s.onNext(Alert.class -any DTO-):
现在它不会在日志上显示任何错误,但调用者会收到 500 响应代码和内容 '['。
日志:
为什么我们不能多次调用 onNext() ?我们怎么能这样做?
注意:我只要调用onNext
一次就可以了:
或者
spring - Web 反应式编程 - 从 HTTP 客户端的角度来看有什么优势?
让我们假设控制器的这两种情况会延迟生成一些随机数:
1) 反应式 Spring 5 反应式应用程序:
2)传统的Spring MVC DeferredResult
:
从 HTTP 客户端(浏览器、AJAX 请求)的角度来看,两种方案之间没有任何区别。我的意思是客户端将等到所有结果都发送完毕,并且在提交整个响应之前不会处理它们。
也就是说,尽管 spring web reactive 让我们认为它正在将结果发送回来,因为它们正在生成,但实际上它不会发生这种情况,并且客户端将无法处理结果,直到所有数字都有已生成。
使客户端完全响应的直接方法是使用 WebSockets。
那么,除了很酷的东西(比如漂亮的语义、组合......),考虑到浏览器 HTTP 请求不是响应式的并且等效于使用传统的 DeferredResult,使用 Spring Web Reactive 的意义何在?
spring - Spring 5 Web Reactive Programming - 从流数据的 Spring Reactive Controller 解组 JSON 时出现 WebClient ClassCastException
这个问题与这个问题有关,我在其中询问了如何从 Reactive Spring Controller 流式传输数据。
正如罗森指出的那样,我们必须使用text/event-stream
将流式传输的结果作为服务器发送的事件发回,到目前为止一切都很好。
我有这样的服务:
从浏览器调用它,开始接收 3 个结果,延迟 1 秒。
我想从 WebClient 调用此服务并以这种方式实现它:
这是测试代码:
问题是,当客户端尝试提取结果时,没有一个HttpMessageReader'
s 可以读取text/event-stream
+ Alert.class
。
例外:
spring - 如何配置 reactor-netty 以使用 SSL?
我正在尝试熟悉 Spring 的 Project Reactor ( https://projectreactor.io/ ) 并构建了一个小型应用程序来通过 SSL 对另一个服务进行 REST 调用。我找不到任何方法来配置org.springframework.web.client.reactive.WebClient
通过 SSL 发出请求。似乎没有关于此的文档。我正在使用reactor-core
3.0.0.RC1 和reactor-netty
0.5.0.M3,以及 Spring Framework 5.0.0.M1。有谁知道如何配置reactor-netty
SSL 支持?
project-reactor - 序列化反应堆中的通量
可以序列化 Reactor Flux。例如,我的 Flux 处于某种状态,当前正在处理某个事件。突然服务终止。Flux 的当前状态保存到数据库或文件中。然后在重新启动应用程序时,我只需从该文件/表中获取所有 Flux 并订阅它们以从上一个状态重新启动处理。这在反应堆中可能吗?
java - 取消订阅的反应堆方式
我试图弄清楚反应堆项目,我正在寻找一种取消订阅的方法。我知道在进行例如 Flux 的订阅后,我可以获得对可用于发送 onCancel 信号的 Cancellation 对象的引用,但这只是在进行订阅之后,我需要在某种集合中保存该引用。
有没有更好的方法来获取 Cancellation 对象?或者只是取消订阅。也许某种包含对所有活动订阅的引用的地方 - 是的,那会很棒......
spring - 异步保存实体时未持久化的实体
我有以下代码:
控制器:
事件通知器:
事件消费者:
事实证明,该方法saveMessage
从未被调用。我看到它在启动服务器过程中被调用过一次。
我不确定这是与反应堆相关还是与弹簧相关。
编辑:我将保存过程移至服务,没有任何改变
edit2:我记录了服务操作以查看发生了什么:
控制台中的输出是:
没有来自 JPA 的日志,什么都没有!