问题标签 [rx-java]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Scala 流媒体库差异(Reactive Streams/Iteratee/RxScala/Scalaz...)
我正在关注 Coursera 上的 Scala 函数式反应式编程课程,我们处理 RxScala Observables(基于 RxJava)。
据我所知,Play Iteratee 的库看起来有点像 RxScala Observables,其中 Observables 有点像 Enumerators,Observers 有点像 Iteratees。
还有 Scalaz Stream 库,也许还有其他一些?
所以我想知道所有这些库之间的主要区别。在哪种情况下,一个可能比另一个更好?
PS:我想知道为什么 Martin Odersky 没有为他的课程选择 Play Iteratees 库,因为 Play 在 Typesafe 堆栈中。这是否意味着 Martin 更喜欢 RxScala 而不是 Play Iteratees?
编辑: Reactive Streams计划刚刚宣布,作为尝试standardize a common ground for achieving statically typed, high-performance, low latency, asynchronous streams of data with built-in non-blocking back pressure
java - 从普通 Java 事件创建 Observable
Observable
从经典 Java 事件模式创建 Rx-Java 的最佳方法是什么?也就是说,给定
我要实施
我想出的实现是:
但是,我真的不喜欢它:
它非常冗长;
每个都需要一个监听
Observer
器(理想情况下,如果没有观察者,则应该没有监听器,否则应该有一个监听器)。这可以通过将观察者计数保持为 中的字段,OnSubscribeFunc
在订阅时增加它并在取消订阅时减少来改进。
有更好的解决方案吗?
要求:
使用现有的事件模式实现而不更改它们(如果我正在控制该代码,我已经可以编写它以返回
Observable
我需要的)。如果/当源 API 更改时出现编译器错误。不能使用
Object
实际事件参数类型或属性名称字符串来代替。
java - 如何在时间间隔内将自定义类用作 Observable 和触发方法
我有一个类Producer
,简化它有public Object readData()
我想让这个类成为Observable
(RxJava)的方法。
如何指示应该调用哪个方法?我需要将我的Producer
课程转换为Future
orIterable
吗?
下一个问题是readData
应该每 n 秒调用一次。某些方法,例如from,具有调度程序参数,但我找不到任何如何应用它的示例。我找到了间隔方法,但它会发出一个整数序列。到目前为止,没有 Observable 我使用Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(....)
java - RxJava——终止无限流
我正在探索反应式编程和 RxJava。这很有趣,但我被困在一个我找不到答案的问题上。我的基本问题是:终止原本无限运行的 Observable 的响应式方法是什么?我也欢迎对我的代码提出批评和反应式最佳实践。
作为练习,我正在编写一个日志文件尾部实用程序。日志文件中的行流由Observable<String>
. 为了BufferedReader
继续阅读添加到文件中的文本,我忽略了通常的reader.readLine() == null
终止检查,而是将其解释为我的线程应该休眠并等待更多记录器文本。
但是,虽然我可以使用 终止观察者takeUntil
,但我需要找到一种干净的方法来终止原本无限运行的文件观察者。我可以编写自己的terminateWatcher
方法/字段,但这会破坏 Observable/Observer 封装——我希望尽可能严格地遵守反应式范式。
这是Observable<String>
代码:
这是在新行出现时打印新行的 Observer 代码:
我的两个问题是:
- 什么是终止原本无限运行的流的反应一致的方法?
- 我的代码中还有哪些其他错误让您哭泣?:)
scala - scala 无限 rx 可观察创建 - 如何正确执行此操作?
我最近开始玩rxjava-scala,我想创建一个(可能)无限流可观察。查看 github 上的代码和打开的问题,我发现“开箱即用”的解决方案尚未实现(问题中的 usecase06 说它甚至没有为 java 实现)。
所以,我试图提出我自己的实现。考虑以下:
和一个辅助方法:
和示例核心:
这似乎工作正常,但我对此不满意。首先,我正在创建一个新的Thread
,这可能很糟糕。但即使我使用某种线程池,它仍然会感觉不对。所以我想我应该使用调度程序,这听起来像是一个合适的解决方案,只是我不知道如何在这种情况下使用它。我尝试rx.lang.scala.concurrency.Schedulers.threadPoolForIO
使用该observeOn
方法进行补充,但似乎我做错了。observable 的代码不会用它编译。任何帮助将不胜感激。谢谢!
reactive-programming - 如何在 RxJava 中计算移动平均线
在金融领域,我们通常需要从一个时间序列数据流中计算出移动窗口聚合值,以移动平均为例,假设我们有以下数据流(T是时间戳,V是实际值):
从我们得到的流中计算移动平均值 3:
要计算移动平均线,我们似乎可以这样做:
- 从原始流构建一个 Observable
- 通过将值聚合到组中,从原始流构建一个 Observable
- 在步骤 2 中使用聚合运算符计算 Observable 的最终结果。
步骤 1 和 3 实现起来很简单,但是,对于步骤 2,当前的 RxJava 似乎没有内置运算符来生成移动窗口组,window/groupBy 运算符似乎不适合这种情况,我没有找到一种从现有运营商组成解决方案的简单方法,有人可以建议如何在 RxJava 中以“优雅”的方式做到这一点吗?
scala - 什么是检查可观察对象是否完成的好方法
我想知道是否有一种方便的方法来检查 observable 是否已完成。例如我有一个测试
该recovered
方法返回一个 Observable[Try[T]] 并且是标准 Observable 的扩展。我想在源 Observable 完成时检查 Observable[Try[T]] 是否完成。
因此,我编写了一个带有主题的测试,我向其中发布了一些值,然后最终完成。有没有一种简单的方法可以检查 newOb 是否也已完成?Observable 中没有类似 isCompleted 的方法。
java - Android Studio 添加 rxjava 库
考虑以下项目结构:
我从http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20a%3A%22rxjava-core%22 (sources.jar ) - 但我也尝试过其他人
然后我在 SubProject 中创建了 lib 文件夹,然后将 .jar 放入其中。
在 Android Studio 中,我右键单击库并选择“添加为库...”,级别:“项目库”和模块:“子项目”。
rxjava 使用包名“rx”。我实现了一些导入这个包的代码:
构建项目时发生以下错误:
我发现需要在 SubProject/build.grandle 中添加一行:
但随后它抛出:
我试图在项目结构中移动 .jar,但到目前为止还没有运气。
如何正确地将 3rd 方库添加到项目中?我自己创建“libs”文件夹可以吗?
reactive-programming - Vert.x - RxJava - zip without returning a new observable
I am looking for a method in RxJava that behaves like the zip method, but without returning any new observable.
I have two observables, one emitting RxMessage objects and the other one RxHttpClientResponse objects. I want some kind of method that receives both objects and executes a function. That's all, without returning a new observable that emits new elements.
Any suggestion?
Thanks in advance