问题标签 [rx-java]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
rx-java - 处理 onError 和继续处理的最佳实践
我是 RxJava 的新手,但我正在将它集成到我正在从事的项目中以帮助我学习它。我遇到了一个关于最佳实践的问题。
我有一个关于如何处理onError
以防止停止Observable
处理的问题。
这是设置:
对于每个我想做 2 个或更多网络请求的用户 ID,我都有一个用户 ID 列表。如果用户 ID 的任何网络请求失败,则该用户 ID 将不会更新并且可以跳过。这不应阻止处理其他用户标识。我确实有一个解决方案,但它涉及嵌套订阅(参见第二个代码块)。我确实看到的一个问题是,如果每个呼叫失败,即使在检测到某个阈值数量失败后,也无法短路并阻止其余呼叫访问网络资源。
有一个更好的方法吗?
在传统代码中:
问题:
伪代码:
以下是 userIds 列表和每 2 个信息请求的工作示例。如果你运行它,你会看到它会失败(见下面的源代码)
输出:
嵌套订阅解决方案:
输出:
如您所见,只有失败的单个用户 ID 停止了他们的单独处理,但其余用户 ID 已被处理。
只是寻求建议,看看这个解决方案是否有意义,如果没有,最好的做法是什么。
谢谢,亚历克斯
rx-java - 使用 RxJava 进行透明的身份验证令牌刷新
我希望在我的 Java 应用程序中使用 Observable 包装经过身份验证的调用,如果有必要,它将在发出请求之前重新授权我的用户。我使用Shape Code Blog中的这种方法在 Objective-C 中使用 ReactiveCocoa 做了类似的事情:
RACSignal 是可观察的 ReactiveCocoa 类似物。此方法创建一个 RACSignal,它将捕获初始请求发出的任何错误信号,如果该错误与给定条件匹配(在这种情况下,访问被拒绝并且有可用的身份验证令牌),则透明地触发一个新的 RACSignal 以使用刷新令牌然后重新发送原始信号。
RxJava 是否提供类似的功能?catch:
我在Combining Observables文档中找不到类似的东西。
java - 将代码重构为响应式风格
我正在开发的一个应用程序大量使用了异步处理,我正在寻找一种更好的方式来组织代码。
在 servlet 上接收系统的外部输入。此 servlet 收集的原始数据存放在队列中。线程池针对该队列运行,并将原始数据解析为结构化记录,然后将其存入一组 N 个队列中的一个。选择队列使得所有同类记录都进入同一个队列。这 N 个队列每个都由一个线程服务,该线程将相同类型的记录收集到一个集合中。每分钟一个计划任务唤醒并将前一分钟收集的每种类型的所有记录写入文件。
目前,此代码使用一堆队列、线程池和不断运行的可运行文件来组织,这使得逻辑难以遵循。我想将此代码重构为上述数据流更明确的内容。我正在寻找实现这一目标的工具和方法。
- RxJava 之类的工具对此有帮助吗?如果是这样,怎么做?
- 我应该考虑其他方法吗?
mapping - RxJava Subject 作为管道
我想使用Subject在不使用map或flatMap的情况下在一个 Observable 到下一个 Observable 之间创建本质上的管道,因为使用它们非常冗长。
我不知道如何用 Subject 做到这一点,尽管这似乎是正确的方法,因为 Subject (根据文档):
既充当订阅者又充当 Observable
签名Subject<T,R>
在它是一个Observable<R>
和一个的地方Observer<T>
。这意味着我应该能够通过管道T
传输到R
.
这是它在代码中的基本外观:
java - 累积来自不同 Streams 的 Observables 进行轮询
我正在尝试从服务器调用中积累 Observables,然后对它们进行平面映射并进行另一个服务器调用。
Observable 的累积(“add”)如何实现?
scala - RxScala 中 FRP 的 Event 和 Behavior 对应的概念是什么?
通过查看RxScala的 Scaladoc,我不清楚如何将Signal、Event 和 Behavior的概念从 FRP 映射到 RxScala。
我还参加了 Coursera Reactive 课程,但没有解释 FRP(如Conal Elliott所述)和 RxScala 之间的联系。我模糊的感觉是 RxScala 中的 Observable 对应于 FRP 中的行为/信号,但我不太确定。
有人可以解释一下如何将 Conal 的 FRP(信号、信号转换器、事件、行为)的概念映射到 RxScala 中定义的概念/类(Future/Observable/Scheduler/Subject)吗?
event-handling - EventBus/PubSub 与(响应式扩展)RX 在单线程应用程序中的代码清晰度
目前,我正在使用带有 Scala(和 JavaFX)的 EventBus/ PubSub架构/模式来实现一个简单的笔记组织应用程序(有点像带有一些附加思维导图功能的 Evernote 客户端),我不得不说我真的很喜欢 EventBus观察者模式。
以下是一些 EventBus 库:
https://code.google.com/p/guava-libraries/wiki/EventBusExplained
http://eventbus.org(目前似乎已关闭)这是我在实施中使用的。
http://greenrobot.github.io/EventBus/
这是 EventBus 库的比较:http ://codeblock.engio.net/37/
EventBus 与发布-订阅模式有关。
然而 !
最近,我参加了Coursera 的 Reactive 课程,并开始怀疑使用RXJava代替 EventBus 是否会在单线程应用程序中进一步简化事件处理代码?
我想问一下使用这两种技术(某种事件总线库和某种形式的响应式扩展(RX))编程的人的经验:使用 RX 解决事件处理复杂性是否比使用给定的事件总线架构更容易没有必要使用多个线程?
我之所以这么问是因为我在 Coursera 上的反应式讲座中听说RX导致的代码比使用观察者模式更简洁(即没有“回调地狱”),但是我没有发现 EventBus 架构与RXJava之间的任何比较。所以很明显,EventBus 和 RXJava 都比观察者模式更好,但 在代码清晰度和可维护性方面哪个在单线程应用程序中更好?
如果我理解正确的话, RXJava的主要卖点是如果存在阻塞操作(例如等待服务器响应),它可以用来生成响应式应用程序。
但我根本不关心异步性,我关心的只是在单线程应用程序中保持代码干净、清晰且易于推理。
在这种情况下,使用 RXJava 是否比使用 EventBus 更好?
我认为 EventBus 将是一个更简单和更清洁的解决方案,我看不出有任何理由为什么我应该将 RXJava 用于单线程应用程序以支持简单的 EventBus 架构。
但我可能错了!
如果我错了,请纠正我,并解释为什么在没有执行阻塞操作的单线程应用程序的情况下,RXJava 会比简单的 EventBus 更好。
java - RxJava 函数式编程:如何使用适当的参数调用匿名函数
RxJava 的 startFuture(...) 接受一个Func0
(不带参数的函数)返回一个 Future。
但是,我有一个带有 3 个参数的函数,可以表示为Func3<type, type, type, Future>
. 我编写了这个函数,它应该关闭 Func3 并将其作为 Func0 的结果返回:
假设我已经为“type 1”、“type 2”等插入了适当的类型,我怎样才能访问 的参数f
以便我可以“关闭”它们?关于 RxJava 的功能部分,我真的找不到任何文档,我查看了源代码,但还没有找到任何地方。
rx-java - 使用 RxJava 构建异步 REST API
查看 RxJava 为我们的 API 构建异步支持。现在我们使用 jetty + JAX-RS @Path 注释并且不确定将传入的 REST api 调用绑定到 RxJava API 的正确方法是什么。
基本上这是在释放请求线程的上下文中,直到数据库的响应准备好。
查看 Vert.x,但这需要 java 7,我们现在与 java 6 绑定。
寻找有关上述建议。将传入的 http 请求绑定到 RxJava API 的典型方法是什么?
java - RxJava 中的笛卡尔积
是否有可能在 rxjava 中获得两个 Observable 的笛卡尔积?
像这样的东西: