问题标签 [reactivex]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
4 回答
47645 浏览

javascript - 如何在 RxJS 中完成 Observable

假设我们有一个Observable

我怎样才能使它完成(什么会触发所有订阅的观察者的onComplete事件)?

0 投票
1 回答
195 浏览

java - Observable 中的 if/else

我在理解如何使用 Observables 在循环内重构这个 if/else 条件时遇到了一些问题。

这是我的代码:

我想要类似的东西

但我不知道如何表达 if/else。

有什么建议么?

问候。

0 投票
2 回答
1929 浏览

javascript - ReactiveX:仅分组和缓冲区每个组中的最后一项

如何对 Observable 进行分组,并从每个 GroupedObservable 仅将最后发出的项目保留在内存中?这样每个组的行为就像 BehaviorSubject 一样。

像这样的东西:

所以在内存中,我们只有每个的最后一项user

当订阅者订阅时,这两个项目会立即发布(每个项目都有自己的发射)。就像我们每个 BehaviorSubject 一样user

onCompleted() 永远不会被触发,因为人们可能会永远聊天。

我事先不知道user可以有什么价值。

0 投票
1 回答
41 浏览

c# - 在reactivex.net 中取消任务

假设我已经存在如下代码:

我尝试将 Rx 应用于前端代码:

它工作正常。

但是,一旦我将订阅绑定到IObservable,有什么办法可以阻止它继续中途获取数据?处置订阅不会停止枚举。

0 投票
3 回答
4785 浏览

java - 为什么我的 RxJava Observable 除非阻塞,否则不会发出或完成?

背景

我有许多 RxJava Observables(要么从 Jersey 客户端生成,要么使用 stubs 生成Observable.just(someObject))。它们都应该只发出一个值。我有一个组件测试,它模拟了所有 Jersey 客户端和使用Observable.just(someObject),并且我看到了与运行生产代码时相同的行为。

我有几个类对这些 observables 起作用,执行一些计算(以及一些副作用——我以后可能会让它们直接返回值)并返回空的 void observables。

有一次,在一个这样的课程中,我试图压缩我的几个源 observables 然后映射它们 - 如下所示:

然后将计算类全部组合并等待:

问题

问题是,processToNewObservable()永远不会被执行。通过消除过程,我可以看到这就是getObservable1()麻烦所在 - 如果我将其替换为Observable.just(null),一切都会按照我的想象执行(但在我想要一个真实的地方使用空值)。

重申一下,在生产代码中从 Jersey 客户端返回一个 Observable,但该客户端是在我的测试中getObservable1()返回的 Mockito 模拟。Observable.just(someValue)

调查

如果我转换getObservable1()为阻塞,然后将第一个值包装在 中just(),一切都会按照我的想象执行(但我不想引入阻塞步骤):

我的第一个想法是,也许其他东西正在消耗我的 observable 发出的值,并且zip看到它已经完成,因此确定压缩它们的结果应该是一个空的 observable。但是,我尝试添加.cache()到我认为相关的每个可观察源,但这并没有改变行为。

我还尝试在 zip 之前在 getObservable1 上添加 next / error / complete / finally 处理程序(不将其转换为阻塞),但它们都没有执行:

问题

我对 RxJava 很陌生,所以我很确定我错过了一些基本的东西。问题是:我能做什么愚蠢的事?如果从我到目前为止所说的内容来看这并不明显,我可以做些什么来帮助诊断问题?

0 投票
1 回答
656 浏览

java - vertx httpserver 等待端口监听

我在运行我的服务器的 RC 时遇到问题。在这一次启动之前,我的集成测试正在访问服务器。在我开始测试之前,如何确保服务器的端口实际上正在侦听?

我正在使用 httpServer.listen(port, handle) 功能,但似乎不像 API 指定的那样工作。

有什么建议么?

0 投票
1 回答
2160 浏览

java - RxJava Android 压缩许多不同类型的请求

我对 Observable.zip 函数有点“审美”问题。你像这样使用它:

请求的数量等于“deal-with-it”函数中的参数数量。现在,如果你有更多的请求,比如 6 个,你最终会得到一个函数,它需要 6 个参数(假设它们都有不同的类型)。就是感觉不干净。有没有办法将它们包装在一个类中,例如属性?

我现在的现实问题是,我使用 zip 来加载设置数据:

  1. 从 rest api 加载列表视图的项目
  2. 加载这些项目的 id,这些项目作为收藏夹存储在 db 中
  3. 合并两个列表,因此最喜欢的项目将 isFavorite > 设置为 true
  4. 更新列表视图

现在没那么糟糕了。但是我想在#3 处添加 2-3 个对其他数据结束函数的请求,这将增长到具有太多函数参数的 4 行猛犸象。

我想,我可以使用嵌套的 Observable.zip,但可能很危险。有没有更优雅的方式来包装这些参数?

我很高兴看到你的建议。

0 投票
1 回答
7056 浏览

javascript - 使用 RxJS Observable 流式传输 JSON

我试图了解一些关于 RxJs 的事情。我想做的是使用一些 JSON 数据,并在数据进入时立即开始在 DOM 上呈现这些数据。我已经设置了流请求、响应和显示。它的每一个输出都很好,但它是一次性完成的,而不是随着时间的推移。

我想开始在页面上显示数据,而不是等待整个文件完成然后立即显示,这会产生很长的等待时间。

0 投票
1 回答
51 浏览

java - Observables 并行执行

我正在用 reactiveX Zip 做一些实验,我注意到我在我的 zip 中定义的 observables 是一个接一个地按顺序执行的。我认为 zip 的好处是在 zip 中定义的每一个 observable 都由一个线程执行,所以它们都是并行执行的。有什么方法可以实现我想要的吗?这是我的 zip 示例

0 投票
3 回答
1156 浏览

java - RxJava - 为什么执行器只使用一个线程

我创建了一个固定线程池来处理每 300 毫秒发出的事件,并假设该过程需要 1000 毫秒。假设多线程可以工作,但只有一个线程被重用。

如果我将 sleepTime 设置为小于 300 毫秒,则处理线程会更改,但这没用。

问题:我该怎么做才能使其并发?为什么程序重用线程?

先感谢您

日志