问题标签 [reactive-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
1300 浏览

spring - Spring WebFlux: Reactive MongoDB

I'm new to Spring Reactor, so i want to refactor this simply spring data (on kotlin) method:

Thanks

0 投票
2 回答
151 浏览

rx-java - 流过滤,直到另一个流发射,但一旦第二个流发射,重新发射那些过滤后的发射

我有一个 Foo 流。Foo 发射需要布局 Android 视图(宽度和高度 > 0)。我为此使用 RxBinding

当这个 observable 发射(或完成,因为采取)视图被布置。

我需要的是 fooObservable() 等到 RxView 发出,即视图被布置。我不能只是 fooObservable.filter( width && height > 0 ),因为这会降低发射。我需要的是缓存 foo 的最后一个发射(如果视图没有布局)并在第一个 RxView.preDraws 之后重新发射。,如果是布局的话应该可以正常通过

0 投票
1 回答
586 浏览

rxjs - 将有限流的无限流转换为无限流 - Reactive X

如何在 Reactive x 中(理想情况下使用 RxJava 或 RxJs 中的示例)可以实现这一点?

a是一个无限的事件流,它触发有限sn的事件流,每个事件都应该是无限流的一部分,S同时能够订阅每个sn流(为了进行求和操作),但同时保持流S为无限。

编辑:更具体地说,我提供了我在 Kotlin 中寻找的实现。每 10 秒发出一个事件,该事件映射到 4 个事件的共享有限流。元流被flatMap编辑成正常的无限流。我利用doAfterNext额外订阅每个有限流并打印出结果。

但是我不确定这是否是“纯 RX”方式。

0 投票
1 回答
413 浏览

scala - Akka 流减少为更小的流

我有一个有序的数据流

我想将它转换为形式的聚合流(项目,计数):

我可以在 Akka Streams 中使用哪些运算符?

我已经查看了.groupBy,但它要求我事先知道我不知道的类别数量。此外,我相信它会记住我想避免的所有组。我应该能够在处理后丢弃 (A, 3) 并释放它消耗的资源。

编辑这个问题要求类似的功能,但使用子流。但是,似乎不需要使用 SubFlows,因为我有一个使用statefulMapConcat组合器的解决方案。

0 投票
2 回答
4919 浏览

project-reactor - 如何正确地将 Flux 包装在 Mono 对象中

我有一个返回学生和注册班级详细信息的网络服务。

该类的 DTO 如下:

获取学生的主要 REST 控制器逻辑如下:

问题是,在进行 REST 调用后,我得到以下输出:

我可以通过阻止通量请求来完成所需的输出,然后将输出转换为列表。

我是响应式编程的新手。在此处使用 Flux 和 Mono 以获得所需输出的正确方法是什么?

0 投票
6 回答
50099 浏览

java - 如何正确读取 Flux并将其转换为单个 inputStream

我正在为我的 spring-boot 应用程序使用WebClient和自定义类BodyExtractor

BodyExtractor.java

上面的代码适用于小有效载荷,但不适用于大有效载荷,我认为这是因为我只读取单个通量值next我不确定如何组合和读取所有dataBuffer.

我是反应堆的新手,所以我不知道很多关于通量/单声道的技巧。

0 投票
1 回答
108 浏览

kotlin - 将 publishOn 与自定义 Publisher 一起使用时的 ReactiveStreams NPE

当我将 Reactive Streams ( https://github.com/reactor/reactor-core ) 与自定义功能Publisher结合使用时publishOn,我总是得到一个 NPE。我的代码有什么问题?我是否Publisher以错误的方式使用?

例外是:

0 投票
2 回答
868 浏览

java - Flow SubmissionPublisher 提供方法的 Java 9 行为

我一直在玩 Java Flowoffer运算符,但是在阅读了文档并进行了测试后,我不明白。

这是我的测试

报价运算符接收要发出的项目和 BiPredicate 函数,据我了解阅读文档,只有在谓词函数为真的情况下才会发出项目。

Bur通过测试后的结果是

如果我返回 true 而不是 false,则结果没有变化。

任何人都可以更好地解释我这个操作员。

0 投票
1 回答
113 浏览

java - Java 9 中向量/流数据的异步、可组合返回值

假设我有一个 API,它会根据一些查询条件查找或构造一个小部件:

(同步)客户端代码如下所示:

现在说查找或构建一个小部件的成本出人意料,我不希望客户在等待它时阻塞。所以我把它改成:

然后客户端可以写:

或者:

现在,假设同步 API 可以返回0 到 n 个小部件:

天真地,我可以写:

然而,这实际上并没有使代码成为非阻塞的,它只是推动阻塞——要么是Future块直到所有Widgets可用的块,要么是迭代Stream等待每个块的代码Widget。我想要的是让我在每个小部件到达时处理它们的东西,例如:

但这不提供错误处理,即使我添加了一个额外的Consumer<Throwable> errorHandler,它也不允许我将我的小部件检索与其他查询组合起来,或者转换结果。

所以我正在寻找一些可组合的东西,它结合了 a 的特性Stream(可迭代性、可转换性)和 a 的特性CompletableFuture(异步结果和错误处理)。(而且,当我们这样做时,背压可能会很好。)

这是java.util.concurrent.Flow.Publisher吗?一个io.reactivex.Observable?更复杂的东西?更简单的东西?

0 投票
0 回答
890 浏览

spring-boot - Spring boot Mongo Reactive Repository 保存文档时出现问题。错误:[找不到类 com.mongodb.DBRef 的编解码器]

我一直在开发一个 Spring Boot Web 应用程序,我决定使用 ReactiveMongoRepository 来持久化数据。但是,当我保存文档时,我观察到一个异常,我在过去几天一直在努力解决它,但没有运气。例外是:

请参考下面我的代码类和属性文件:

构建.gradle:

应用程序属性

应用程序.java

人.java

PersonDetails.java

PersonRespository.java

PersonDetailsRespository.java

在运行此应用程序时,会发生上述异常,我可以看到它使用的是 mongodb 驱动程序版本 mongodb-driver-core-3.4.2.jar。我注意到它在 PersonDetails.java 中的 Person 类上的 @DBref 有问题,如果我从 Application 中评论以下行,那么它可以工作

提前致谢。吉丁