问题标签 [reactivex]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
5 回答
38338 浏览

c# - 为什么在 .NET 响应式扩展中不推荐主题?

我目前正在掌握 .NET 的反应式扩展框架,并且正在通过我找到的各种介绍资源(主要是http://www.introtorx.com

我们的应用程序涉及许多检测网络帧的硬件接口,这些将是我的 IObservables,然后我有各种组件将消耗这些帧或对数据执行某种方式的转换并生成一种新类型的帧。例如,还有其他组件需要显示每第 n 帧。我确信 Rx 将对我们的应用程序有用,但是我正在为 IObserver 接口的实现细节而苦苦挣扎。

我一直在阅读的大多数(如果不是全部)资源都说我不应该自己实现 IObservable 接口,而是使用提供的函数或类之一。根据我的研究,创建一个似乎可以满足Subject<IBaseFrame>我的需求,我将拥有从硬件接口读取数据然后调用Subject<IBaseFrame>实例的 OnNext 函数的单线程。然后,不同的 IObserver 组件将接收来自该主题的通知。

我的困惑来自本教程附录中给出的建议,其中说:

避免使用主题类型。Rx 实际上是一种函数式编程范式。使用主题意味着我们现在正在管理状态,这可能会发生变化。同时处理变异状态和异步编程是非常困难的。此外,许多运算符(扩展方法)都经过精心编写,以确保订阅和序列的正确和一致的生命周期得以维持;当你介绍主题时,你可以打破这个。如果您明确使用主题,未来的版本也可能会出现显着的性能下降。

我的应用程序对性能非常关键,我显然会在使用 Rx 模式之前测试它的性能,然后再将其用于生产代码;但是我担心我正在使用 Subject 类做一些违背 Rx 框架精神的事情,并且该框架的未来版本会损害性能。

有没有更好的方法来做我想做的事?无论是否有任何观察者,硬件轮询线程都将持续运行(否则硬件缓冲区将备份),因此这是一个非常热的序列。然后我需要将接收到的帧传递给多个观察者。

任何建议将不胜感激。

0 投票
0 回答
116 浏览

reactive-programming - 如何从 PublishSubject 获取历史数据?

如何从 a 中获取所有历史数据PublishSubject

它打印:

你可以看到没有b: 1打印。

如果我必须使用PublishSubject(因为我需要从多个地方更新一个 observable),我怎样才能确保以后的订阅者也可以获得所有的历史数据?

0 投票
1 回答
37 浏览

java - 如何用自定义数字和不同的延迟构造一个 Observable?

为了用 rxscala 测试我的反应程序,我需要构造这样一个Observable

哪一个

  1. 公布号码1
  2. 然后等待1s
  3. 公布号码4
  4. 然后等待3s
  5. 公布号码2
  6. 然后等待2s

我有一个丑陋的解决方案,使用Threadand ReplaySubject

有没有更好的解决方案?

0 投票
1 回答
146 浏览

scala - `interval`上的`delay`会抛出`NoSuchElementException`?

一些简单的rxscala代码:

运行它时,它会抛出异常:

哪里错了?

0 投票
0 回答
260 浏览

reactive-programming - 如何从可观察方取消订阅所有观察者?

使用 rxscala,我们可以像这样订阅 observables:

是否可以从侧面取消订阅所有观察者(有 4 个)stream

0 投票
1 回答
193 浏览

java - ReactiveX:计算 Observable 中不同元素的频率

我有一个Observable<String>. 我想把它变成一个Map<String, Int>告诉我每个不同字符串的出现次数。

observable 包含约 10 亿个元素,其中 1000 个是不同的(因此将整个数据集存储在 RAM 中不是一种选择)。目前我迭代Observable并更新了一个HashMap. 我还确保在同一个线程上进行观察以避免竞争条件。但是,获取元素频率本质上应该很容易并行化,因此利用它会很好。

有没有办法做到这一点?

0 投票
1 回答
231 浏览

java - 如何正确理解 RxJava 的 groupBy 的行为?

一般来说,我对 RxJava 和 FP 还是很陌生。我想写一个代码来加入两个Observables。假设我们有两组整数:

  • [0..4]以 键选择器为模2,给出(key, value) = {(0,0), (1,1), (0,2),...}
  • [0..9]以 键选择器为模3,给出(key, value) = {(0,0), (1,1), (2,2), (0,3), (1,4),...}

我加入他们的步骤如下:

  1. 按其键对每个集合进行分组。第一组使用键0和创建两个组1。第二个使用键创建三个组0,12
  2. 制作两组组的笛卡尔积,总共给出 6 对组,键为:0-0, 0-1, 0-2, 1-0, 1-1, 1-2
  3. 只过滤那些两边都有相同键的对,只留下0-01-1
  4. 在每一对中,制作左右组的笛卡尔积。

下面是计算笛卡尔积的辅助类:

这是加入的代码:

但是,输出不正确:

如果我删除包含 的行filter,则根本没有结果。正确的输出应该就像运行这个:

这使:

有人可以解释这种行为吗?非常感谢。

注意:我使用org.apache.commons.lang3.tuple.ImmutablePair以防您想知道。

0 投票
2 回答
1318 浏览

java - Rx 中的 groupBy、过滤器和内存泄漏

根据以下文件groupBy

注意:AGroupedObservable将缓存它要发出的项目,直到它被订阅。出于这个原因,为了避免内存泄漏,你不应该简单地忽略那些GroupedObservable与你无关的s。相反,您可以向他们发出信号,表明他们可能会通过对他们应用类似的运算符take(int)(0)来丢弃他们的缓冲区。

有一个RxJava 教程说:

在内部,每个 Rx 操作员做 3 件事

  1. 它订阅源并观察值。
  2. 它根据操作员的目的转换观察到的序列。
  3. 它通过调用 onNext、onError 和 onCompleted 将修改后的序列推送给自己的订阅者。

让我们看看下面的代码块,它只从 中提取偶数range(0, 10)

我的问题是:

  1. 这是否意味着filter运营商已经暗示订阅每个组产生的组groupBy或只是Observable<GroupedObservable>一个组?

  2. 这种情况下会不会有内存泄漏?如果是这样的话,

  3. 如何正确丢弃这些组?替换filter为自定义的,其中 atake(0)后跟 a return Observable.empty()?你可能会问我为什么不直接返回take(0):这是因为filter不一定紧跟在 之后groupBy,而是可以在链中的任何位置并涉及更复杂的条件。

0 投票
1 回答
94 浏览

reactive-programming - 分组后无法打印可观察的项目

无法理解为什么以下rxscala代码无法按预期工作:

我按 分组projectEventsprojectName希望打印每个项目的项目。但是当我运行这段代码时,它只打印:

没有######### event in project印刷品。

我不明白为什么,我错过了什么吗?

0 投票
1 回答
2293 浏览

javascript - 创建后更改可观察对象的间隔/设置

在 RxJS 中,创建后如何更改间隔设置?

到目前为止我有这个,但它不起作用

它说“TypeError:observable.interval 不是 Sixage.js:10:14 的函数”

jsbin

编辑

这是使用公认答案后的最终产品。

jsbin