问题标签 [reactivex]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
javascript - 如何在 RxJS 中完成 Observable
假设我们有一个Observable:
我怎样才能使它完成(什么会触发所有订阅的观察者的onComplete事件)?
java - Observable 中的 if/else
我在理解如何使用 Observables 在循环内重构这个 if/else 条件时遇到了一些问题。
这是我的代码:
我想要类似的东西
但我不知道如何表达 if/else。
有什么建议么?
问候。
javascript - ReactiveX:仅分组和缓冲区每个组中的最后一项
如何对 Observable 进行分组,并从每个 GroupedObservable 仅将最后发出的项目保留在内存中?这样每个组的行为就像 BehaviorSubject 一样。
像这样的东西:
所以在内存中,我们只有每个的最后一项user
:
当订阅者订阅时,这两个项目会立即发布(每个项目都有自己的发射)。就像我们每个 BehaviorSubject 一样user
。
onCompleted() 永远不会被触发,因为人们可能会永远聊天。
我事先不知道user
可以有什么价值。
c# - 在reactivex.net 中取消任务
假设我已经存在如下代码:
我尝试将 Rx 应用于前端代码:
它工作正常。
但是,一旦我将订阅绑定到IObservable
,有什么办法可以阻止它继续中途获取数据?处置订阅不会停止枚举。
java - 为什么我的 RxJava Observable 除非阻塞,否则不会发出或完成?
背景
我有许多 RxJava Observables(要么从 Jersey 客户端生成,要么使用 stubs 生成Observable.just(someObject)
)。它们都应该只发出一个值。我有一个组件测试,它模拟了所有 Jersey 客户端和使用Observable.just(someObject)
,并且我看到了与运行生产代码时相同的行为。
我有几个类对这些 observables 起作用,执行一些计算(以及一些副作用——我以后可能会让它们直接返回值)并返回空的 void observables。
有一次,在一个这样的课程中,我试图压缩我的几个源 observables 然后映射它们 - 如下所示:
然后将计算类全部组合并等待:
问题
问题是,processToNewObservable()
永远不会被执行。通过消除过程,我可以看到这就是getObservable1()
麻烦所在 - 如果我将其替换为Observable.just(null)
,一切都会按照我的想象执行(但在我想要一个真实的地方使用空值)。
重申一下,在生产代码中从 Jersey 客户端返回一个 Observable,但该客户端是在我的测试中getObservable1()
返回的 Mockito 模拟。Observable.just(someValue)
调查
如果我转换getObservable1()
为阻塞,然后将第一个值包装在 中just()
,一切都会按照我的想象执行(但我不想引入阻塞步骤):
我的第一个想法是,也许其他东西正在消耗我的 observable 发出的值,并且zip
看到它已经完成,因此确定压缩它们的结果应该是一个空的 observable。但是,我尝试添加.cache()
到我认为相关的每个可观察源,但这并没有改变行为。
我还尝试在 zip 之前在 getObservable1 上添加 next / error / complete / finally 处理程序(不将其转换为阻塞),但它们都没有执行:
问题
我对 RxJava 很陌生,所以我很确定我错过了一些基本的东西。问题是:我能做什么愚蠢的事?如果从我到目前为止所说的内容来看这并不明显,我可以做些什么来帮助诊断问题?
java - vertx httpserver 等待端口监听
我在运行我的服务器的 RC 时遇到问题。在这一次启动之前,我的集成测试正在访问服务器。在我开始测试之前,如何确保服务器的端口实际上正在侦听?
我正在使用 httpServer.listen(port, handle) 功能,但似乎不像 API 指定的那样工作。
有什么建议么?
java - RxJava Android 压缩许多不同类型的请求
我对 Observable.zip 函数有点“审美”问题。你像这样使用它:
请求的数量等于“deal-with-it”函数中的参数数量。现在,如果你有更多的请求,比如 6 个,你最终会得到一个函数,它需要 6 个参数(假设它们都有不同的类型)。就是感觉不干净。有没有办法将它们包装在一个类中,例如属性?
我现在的现实问题是,我使用 zip 来加载设置数据:
- 从 rest api 加载列表视图的项目
- 加载这些项目的 id,这些项目作为收藏夹存储在 db 中
- 合并两个列表,因此最喜欢的项目将 isFavorite > 设置为 true
- 更新列表视图
现在没那么糟糕了。但是我想在#3 处添加 2-3 个对其他数据结束函数的请求,这将增长到具有太多函数参数的 4 行猛犸象。
我想,我可以使用嵌套的 Observable.zip,但可能很危险。有没有更优雅的方式来包装这些参数?
我很高兴看到你的建议。
javascript - 使用 RxJS Observable 流式传输 JSON
我试图了解一些关于 RxJs 的事情。我想做的是使用一些 JSON 数据,并在数据进入时立即开始在 DOM 上呈现这些数据。我已经设置了流请求、响应和显示。它的每一个输出都很好,但它是一次性完成的,而不是随着时间的推移。
我想开始在页面上显示数据,而不是等待整个文件完成然后立即显示,这会产生很长的等待时间。
java - Observables 并行执行
我正在用 reactiveX Zip 做一些实验,我注意到我在我的 zip 中定义的 observables 是一个接一个地按顺序执行的。我认为 zip 的好处是在 zip 中定义的每一个 observable 都由一个线程执行,所以它们都是并行执行的。有什么方法可以实现我想要的吗?这是我的 zip 示例
java - RxJava - 为什么执行器只使用一个线程
我创建了一个固定线程池来处理每 300 毫秒发出的事件,并假设该过程需要 1000 毫秒。假设多线程可以工作,但只有一个线程被重用。
如果我将 sleepTime 设置为小于 300 毫秒,则处理线程会更改,但这没用。
问题:我该怎么做才能使其并发?为什么程序重用线程?
先感谢您
日志