问题标签 [reactivex]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
5035 浏览

rx-java - How can an Observable be paused without losing the items emitted?

I have an Observable that emits ticks every second:

I'd like to pause this Observable so it stops emitting numbers, and resume it on demand.

There are some gotchas:

  • according to the Observable Javadoc, interval operator doesn't support backpressure
  • the RxJava wiki about backpressure has a section about Callstack blocking as a flow-control alternative to backpressure:

Another way of handling an overproductive Observable is to block the callstack (parking the thread that governs the overproductive Observable). This has the disadvantage of going against the “reactive” and non-blocking model of Rx. However this can be a viable option if the problematic Observable is on a thread that can be blocked safely. Currently RxJava does not expose any operators to facilitate this.

Is there a way to pause an interval Observable? Or should I implement my own 'ticking' Observable with some backpressure support?

0 投票
2 回答
1353 浏览

rx-java - ReactiveX 超时后发出空值或标记值

寻找一种干净的方法来将源转换为在一段时间内不发射项目后Observable发射单个(或标记值)。null

例如,如果源 observable 发射1, 2, 3然后在发射前停止发射 10 秒,4, 5, 6我希望发射的项目是1, 2, 3, null, 4, 5, 6.

该用例用于在 UI 中显示值,其中显示的值应该变成破折号-,或者N/A如果最后发出的值是陈旧的/旧的。

我调查了timeout操作员,但它Observable在超时发生时终止,这是不可取的。

使用 RxJava。

0 投票
1 回答
654 浏览

rx-java - 使用 RxJava 重复改造调用,直到条件有效

问题:点击一个端点,直到返回的项目列表为空。每个连续调用都会有一个更新的 packetId 查询参数,它是最后一项的 packetId。

设置:Retrofit2 与 Rx 适配器

尝试

这里的 packetId 不会被更新,因为 observable 将使用与它创建时相同的参数。

获取结果后,我总是可以再次调用此方法。

问题:有没有一种被动的方式来做到这一点?我假设它将涉及一些以某种方式反馈结果的操作员。

0 投票
2 回答
355 浏览

android - 如何用 RxJava 演示者替换 EventBus 模式?

Controller+模式在 Android中EventBus工作得很好,但是到处触发事件变得非常混乱。如果控制器多次被要求加载相同的数据,很容易让它第一次开始加载,并在加载时忽略后续请求,并在加载完成后开始监听未来的加载请求。如何使用 RxJava 做到这一点(使用 MVP 演示者?)

设想

我的 android 应用程序中有一个单例ColorModel,它加载我最喜欢的颜色(来自多个来源),作为Observable<String>(String == hexcode)。

我的ColorPresenter钩子ColorModel,将String十六进制代码转换为Color可以显示在ColorView.

这些是我的逻辑的主要部分:

  1. Observable<String> getColor()(来自ColorModel
  2. Observable<Color> getColorForView()(来自ColorPresenter
  3. subscribe()(在ColorView

问题

  • #1 在每个订阅上创建。如果我有多个演示者调用#1,这是非常浪费的。我不希望重复此步骤(仅在第一次调用时启动,然后在之后缓存)。
  • 当我在 期间取消订阅时onStop(),#2 被取消(需要),但是取消订阅会上升到 #1 也被取消(不需要)。
0 投票
2 回答
1154 浏览

javascript - 我可以从链接的 AJAX 调用中生成一系列 Rx Observables 吗?

技术背景

我在 Web 浏览器中使用 jQuery 来调用从日志中返回一组条目的 API。

API 请求有两个参数:

  • offset_timestamp:一个整数,指定我想要的最早的可能条目
  • limit:一个整数,指定要返回的记录数

示例请求和响应

如果我使用普通的 jQuery + 回调,我想我必须递归地链接 AJAX 调用。类似于以下内容:

自然,我宁愿拥有一系列 Observables(每个都持有一批条目),这样我就可以flatMap()用来获取所有条目的序列。

问题

如果从 jQuery 调用创建一个 Observable,例如Rx.Observable.fromPromise(jQuery.ajax({...})),是否可以使用 RxJS 将任意数量的这些 Observable 链接在一起,使用response.next_timestamp来自先前调用的值在后续调用的参数中?

0 投票
0 回答
430 浏览

android - RxJava 重新创建 observable onError

所以我有我的form模型,它包含我想要验证的所有数据,然后发送到服务器。让我们尽可能简单——isFormValid否则 api 请求应该返回Observable.errr(throwable)应该onError()subscriber.

好的,现在假设用户输入了无效数据,submitForm()被调用并 - 确定 -onError被调用subscriber,然后onComplete。然后用户输入有效数据并submitForm()再次被调用。

现在这是问题所在 - 在第二次submitForm()调用中没有任何反应!至少flatMap Func1和第二个flatMap Func2没有被调用。为什么?我究竟做错了什么 ?这是一个架构缺陷吗?

0 投票
1 回答
1588 浏览

javascript - RxJS 减少一个 ReplaySubject

我正在使用 ReactiveX/RxJS 版本。

假设我有一个 Rx.ReplaySubject ,它每 2 秒发出一个包含 id 和带有值的数组的对象。我想减少这个值数组并得到它们的总和。

问题是 ReplaySubject 是一个热的可观察对象并且它永远不会完成,至少我不希望它完成,因为我想要每 2 秒该对象值的总和。但是为了使用 reduce 运算符,应该完成 observable。那么,我应该如何进行呢?

EG 不工作代码:

0 投票
2 回答
1257 浏览

java - RxJava 和 RxAndroid 的结合?

我的场景与这张图片非常相似:

在此处输入图像描述

应用程序的流程将是这样的:

  1. 视图需要更新。
  2. 创建一个 observableRxAndroid用于从缓存/本地文件中获取数据。
  3. 更新视图。
  4. Retrofit使用和再次进行网络调用,使用RxJava来自 Web 服务的新数据再次更新视图。
  5. 使用新数据更新本地文件。

所以,我更新了两次视图(一次来自本地文件,之后通过网络服务)

如何使用RxJavaand实现结果RxAndroid?我在想的是

  1. 创建一个 observable1 以从本地文件系统获取数据。
  2. 在我可以创建另一个的onNext方法中。observable1observable2
  3. observable2.onNext()我可以更新本地文件。现在我将如何view使用更新的数据(加载到文件中)来更新?

什么是好方法?

0 投票
1 回答
946 浏览

java - 在没有 lambda 表达式的 Java 6 中创建 ReactiveX observable

我找到了一个如何创建可观察对象(ReactiveX)的示例:

但是我的项目不支持 Java 8 的 lambda 表达式。我找不到如何在没有 lambda 表达式的情况下使用 ReactiveX observable 的示例。

0 投票
1 回答
202 浏览

java - 在 RxJava 中加入两个大型数据集

我正在使用 RxJava 处理两个需要通过 ID 连接的大型数据集(数百万条记录)。这两个数据集不一定包含相同的记录。但它们是按 ID 排序的。

我发现该join方法可以用于此,下面的实验进行了“完全连接”,并通过匹配的记录进行过滤。

这适用于一小部分示例,但对于大量示例来说效率很低。

所以我的问题是:看到集合是按键排序的,有没有办法可以使用这些选择器/窗口函数来限制连接,所以我不必将 300 万条记录连接到 300 万条记录?

还是我一起做错了?