问题标签 [rx-java]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 不要在错误时停止 Observable
这是我目前拥有的 Scala 代码:
val b = Observable.interval(1 second).map(n => if (n % 2 == 1) throw new Exception else n*n)
b.subscribe(n => println(n), e => println("error"), () => println("done"))
这是我的输出:
0
错误
如何修改我的 Observable 以便在每个错误之后继续运行,并且我将得到如下输出:
reactive-programming - RxJava:你能给我一个真实的场景来使用 flatMap 而不是 map
我无法真正理解何时使用 flatmap 而不是 map,也找不到一个很好的例子。
你能想出一个好的场景来选择平面地图而不是地图吗?
谢谢。
scala - 如何为“observeOn”方法创建一个“调度程序”?
我在我的 Scala 项目中使用 RxJava,我需要Observable
在一个单独的线程中执行我的。我知道为了实现这一点,我需要observeOn
在它上面调用方法并传递一个实例rx.lang.scala.Scheduler
作为参数。
但是我怎样才能创建那个实例呢?我没有找到任何明显的rx.lang.scala.Scheduler
特征实例化方法。例如,我有这个代码:
Observable.from(List(1,2,3)).observeOn(scheduler)
有人可以提供一个可以解决问题的工作变量示例scheduler
吗?
multithreading - 如何在单独的线程中运行“Observable”?
我在 Scala 项目中使用 RxJava 并说我有这个简单的Observable
:
我永远不会收到“你好”消息,因为while
阻塞了一个线程。如何Observable
在单独的线程中运行以避免阻塞?
====================================
我在想observeOn
可能会有所帮助,但事实并非如此。运行这个:
...仍然不打印“你好”。我猜想添加observeOn
makeOnNext
在单独的线程中调用而不是while
块本身?
====================================
我当然可以换成while
一个Future
:
但也许存在更多的 rx 惯用方式来做到这一点?
reactive-programming - 从 Observable 创建 BehaviorSubject
假设我有一个observableA
从网络获取数据并将其发出的数据,如果我每次尝试从网络请求数据时都订阅这个 observable,那么它会很重。
我想创建 BehaviorSubject 并将其连接到 observableA,以便任何其他线程/对象都将订阅 BehaviorSubject 以获得最新发出的数据。
到目前为止,我无法管理它来编码。我无法创建空的 BehaviorSubject 并在 observableA 中调用,因为它们彼此不相关。我无法订阅 observableA 并获得 BehaviorSubject 作为观察者,知道如何完成它吗?或者甚至更好?
algorithm - 异步、提前退出、串联的 Observable
假设我们有 3 个 observables ,A、B和C。我需要同时运行所有 3 个(异步,对于外行),但是:
- 如果我从A得到任何东西,发出它......不要发出任何其他东西。
- 如果A完成而没有发出任何东西,则将规则 1 应用于B。
- 如果B完成而没有发出任何东西,则从C发出项目。
- 如果C完成时没有发出任何东西,则发出一个默认项。
昨天我花了几个小时试图解决这个问题,而且 RxJava 中似乎没有任何操作组合可以让我做到这一点。
您可以考虑从左到右级联的值:
A --> B --> C
而且,级联被阻塞,而每个级联运行异步并缓存它们的值。
A(无)--> B(无)--> C(无)--> 默认项
需要明确的是,A必须在任何其他观察者发出任何内容之前完成。B 和 C 的逻辑相同,如果 A、B、C 未能发出任何内容,则默认为默认值。
显然这涉及到缓存,我绝对不想重播 observable。我将需要重播缓存的值。每个门都挂着。
该行为与concat()极为相似,只是如果在其之前有排放,则不会释放链的下一部分。
scala - 为什么head不取消订阅
假设您有以下Observable
内容rxjava-scala-0.18.4
第二个断言失败,这意味着head
没有取消订阅。我对 Observables 的理解是错误的还是应该head
在第一个元素发出后取消订阅?
java - RxJava 中的测试主题
我正在查看 Netflix 的一个名为 RxJava 的响应式扩展的 Java 实现。
他们似乎选择将 .NET 实现Subject<T>
为TestSubject<T>
. 有谁知道为什么它被称为TestSubject<T>
which 对我来说会推断它可能不应该被使用?
我看不出如何从类中的方法创建 Observable。例如
android - 在 RxJava 中处理 API 异常
我目前正在尝试围绕 RxJava 进行研究,但是在以优雅的方式处理服务调用异常时遇到了一些麻烦。
基本上,我有一个(改造)服务,它返回一个Observable<ServiceResponse>
. ServiceResponse
定义如下:
现在我想要的是将该通用响应映射到List<Account>
包含在数据 JsonElement 字段中的 a (我假设您不关心Account
对象的外观,所以我不会用它污染帖子)。以下代码非常适合成功案例,但我找不到处理 API 异常的好方法:
有一个更好的方法吗?这确实有效,onError 会向我的观察者触发,我会收到我抛出的错误,但看起来我做的绝对不对。
提前致谢!
编辑:
让我澄清一下我想要实现的目标:
我想要一个可以从 UI 调用的类(例如,Activity、Fragment 或其他)。该类将采用 aObserver<List<Account>>
作为参数,如下所示:
该方法将返回一个订阅,当 UI 被分离/销毁/等时可以取消订阅。
参数化的观察者将处理 onNext 以处理传入帐户列表的成功响应。OnError 将处理任何异常,但也会传递任何 API 异常(例如,如果响应状态!= 200,我们将创建一个 Throwable 并将其传递给 onError)。理想情况下,我不想只是“抛出”异常,我想将它直接传递给观察者。这就是我看到的所有例子。
复杂之处在于我的改造服务返回一个ServiceResponse
对象,所以我的观察者无法订阅它。我想出的最好的方法是在我的观察者周围创建一个观察者包装器,如下所示:
我仍然觉得我没有正确使用它。我之前绝对没有见过其他人使用 ObserverWrapper。也许我不应该使用 RxJava,尽管 SoundCloud 和 Netflix 的人在他们的演示文稿中真的向我推销了它,而且我非常渴望学习它。
java - 如何等待异步 Observable 完成
我正在尝试使用 rxjava 构建示例。该示例应协调 ReactiveWareService 和 ReactiveReviewService 重新运行 WareAndReview 组合。
鉴于我不想返回 Observable,我该如何等待异步 Observable (findReviewsByItem) 完成?