问题标签 [reactive-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
reactive-programming - Flux 和 Mono 中的 compose() vs. transform() vs. as() vs. map()
最近,我决定用projectreactor.io (io.projectreactor:3.1.1)尝试 spring 5 。
有谁知道使用此功能的最佳情况是什么?使用它们各自的优缺点以及应该在哪里使用它们?
好的例子会有所帮助。
reactive-programming - 对 java.util.concurrent.Flow.Processor 的良好实现/支持
最近发现projectreactor.io对Publisher的支持不错:
对处理器有什么好的支持吗? 我的意思是类似或类似的东西:
如果没有,我该如何实现自己的,或者为什么我不能这样做?
更新1:
经过讨论(见评论),似乎在我的用例中我需要使用flatMap
(见答案),我的问题是处理器的良好实现,我的意思是一些功能,如果它失败了,我可以控制并发出错误。我认为flatMap
会给你足够的功能。就我而言,我使用了:
scala - 在构建源作为对数据的响应时消除内部收集
我有一个流(createDataPointFlow),它是通过执行一个mapAsync
收集数据点(通过Sink.seq
)构建的,否则我想直接流式传输(即不先收集)。
但是,对我来说,如何在不收集项目的情况下做到这一点并不明显,似乎我需要某种机制来将我的项目直接发布到我正在创建的流程的输出部分,但我是新手,不要不知道如何在没有明确参与者参与的情况下做到这一点,我想避免这种情况。
我怎样才能在不需要先将东西收集到 Sink 的情况下实现这一点?请记住,我想要实现的是完全流式传输,而无需进行显式缓冲Sink.seq(...)
。
java - 左外连接两个带有项目反应器的排序稀疏序列
给定两个具有稀疏 ID 的实体流。让我们将它们建模为:
实现一个组成管道的函数,该管道执行 SQL 中称为 FULL OUTER JOIN 的操作。这样最后调用如下代码:
产生类似于以下的结果:
不知道 .join() 是否可以使用,尝试过 .zip() 但它不会按 ID 映射它们,并在第一个序列用完元素时停止。我知道 .bufferUntil() 可以使用,但正在寻找其他一些选项,最好是我缺少的一些本机支持。任何关于如何有效实施它的想法都非常受欢迎。
scala - 同一流中的多个接收器
我有一个这样的流和两个接收器,但一次只使用一个:
或者
我们使用哪个接收器是可配置的,但是如果我同时使用两个接收器怎么办。我怎样才能做到这一点?
我考虑过 Sink.combine,但它还需要一个合并策略,我不想以任何方式组合这些接收器的结果。我并不真正关心它们,所以我只想通过 HTTP 将相同的数据发送到某个端点,同时将它们发送到数据库。Sink combine 与广播非常相似,但从头开始实现广播会降低代码的可读性,现在我只有简单的源、流和接收器,没有低级图形阶段。
你知道如何做到这一点的正确方法(有背压和其他我只使用一个水槽的东西)吗?
http - HTTP协议如何实现从HTTP服务器到客户端的异步数据流发送?
流行的 HTTP 服务器或框架如何使用 HTTP 协议来实现从 HTTP 服务器到 HTTP 客户端的异步数据流?(客户端可以是浏览器或非浏览器)
延迟可能是不确定的
x 是对服务器和客户端有意义的单个数据对象。
只是强调一下,我不是在寻找流的实现(例如反应流、RxJava 等),但我想知道如何使用 HTTP 协议来实现这种异步数据流(不是视频流,而是比如说,json流)。例如,他们使用哪些 HTTP 标头,使用哪种连接等。
reactive-programming - 如何将消息传递给另一个订阅者?
我正在学习 Java 9 的 Reactive Stream API。
因为它有发布者和订阅者,订阅者订阅发布者,订阅者也实现了以下消息:
我没有在订阅者中找到任何将消息传递/传输给另一个订阅者的方法。有什么建议么?
reactive-programming - 订阅者可以充当发布者吗?
就反应式流而言,有一个发布者,它可以有尽可能多的订阅者。
但是假设,订阅者从发布者那里得到了一条消息。现在这个订阅者(比如 Subs1)更改/修改消息并将其传递给其他订阅者(比如 Subs2),后者使用修改后的消息。
那么这个 Subs1 订阅者可以充当发布者,可以将消息传递给新的 Subs2 订阅者吗?
我不确定它是否可能,但我认为这种情况是可能的。
如果可能,请提出一种可能的方法来做到这一点。
spring-boot - 是否可以在 Spring WebFlux BodyInserters 中使用使用 Java 9 Reactive Stream 创建的 Publisher?
我正在学习使用 Java 9 中的 Reactive Streams 并在 Spring 5 中使用 org.reactivestreams 的反应式编程。
我正在使用 webflux api spring 5 和 spring boot2 创建一个 WebClient。
现在,当我们使用 webclient 设置请求的主体时,我们可以在 BodyInserters 方法中传递一个 Reactive Stream(具体来说是 spring 5 中的 org.reactivestreams),如下所示:
并来自 fromPublisher 方法定义:
返回写入给定 Publisher 的 BodyInserter。
参数:publisher 发布者要流式传输到响应体 elementClass 发布者中包含的元素的类
现在,发布者和订阅者也在 java 9 反应流中,我创建的发布者之一是:
现在我的问题是:是否可以在 BodyInserters 的 fromPublisher 方法中使用来自 TestReactiveStreams 的发布者?
我尝试了以下方法:
但这给出了以下编译时错误:
请建议,我们怎样才能做到这一点?
reactive-programming - 响应式流中的 Mono vs Flux
根据文档:
Flux 是一个可以发出 0..N 个元素的流:
Mono 是 0..1 个元素的流:
并且两者都是 Publisher 接口在反应流中的实现。
我们不能在大多数情况下只使用 Flux,因为它也可以发出 0..1,从而满足 Mono 的条件吗?
还是有一些特定情况只需要使用 Mono 而 Flux 无法处理这些操作?请建议。