问题标签 [rx-scala]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
91 浏览

reactive-programming - 如何观察反向依赖顺序?

我想以反向依赖顺序观察对象树,但不知道要使用什么组合器。具体情况是遍历 AWS 资源以便删除,例如在删除 S3 存储桶之前删除 S3 对象,如下所示:

所以,最终的combinedobservable 应该在发射桶之前发射所有的对象。我应该为此使用什么组合器?

0 投票
1 回答
893 浏览

multithreading - RxJava:订阅后阻塞可观察对象?

我特别需要一个可观察的。

通常,我的 observables 在不同的线程中运行。但是,有时他们需要在订阅过程中阻塞另一个线程。未来的行为方式。

一个例子:

突然,另一个线程上发生了一个事件,该事件向当前线程发出信号,表明它应该等待订阅的执行s。(一个例子是 Android 的onPause.)

我怎么做?我如何等待订阅s,甚至可能检索所有结果?

(主题?)

0 投票
1 回答
88 浏览

rx-scala - 如何导致压缩两个 Observable 的背压异常?

我正在尝试编写一个测试来查看 onBackPressureDrop 在 RxScala 中的使用。

我正在用一个简单的压缩功能压缩一个快速的 Obserable 和一个慢速的 Obserable。

奇怪的是,RxJava 中的相同示例会产生异常,但使用 RxScala 似乎不需要 onBackPressureDrop 指令。

测试如下所示:

由于没有背压,我怎样才能使这段代码失败?

0 投票
1 回答
230 浏览

scala - 在 Rx(或 RxJava/RxScala)中,如何制作一个自动重置状态锁存映射/过滤器来测量流内经过的时间来触摸障碍?

抱歉,如果问题措辞不佳,我会尽力而为。

如果我有一个以时间为单位的值序列,Observable[(U,T)]其中 U 是一个值,T 是类时间类型(或者我想的任何差异),我怎么能编写一个操作符,它是一个自动重置的一键式屏障,它在 时是无声的abs(u_n - u_reset) < barrier,但如果碰到屏障就会吐出t_n - t_reset,此时它也会重置u_reset = u_n

也就是说,该运算符接收到的第一个值成为基线,它什么也不发出。此后,它会监视流的值,一旦其中一个值超出基线值(高于或低于),它就会发出经过的时间(由事件的时间戳测量),并重置基线。然后将处理这些时间以形成对波动率的高频估计。

作为参考,我正在尝试编写http://www.amazon.com/Volatility-Trading-CD-ROM-Wiley/dp/0470181990中概述的波动率估计器,而不是测量标准偏差(在常规均匀时间的偏差),您重复测量突破某个固定障碍量的障碍所需的时间。

具体来说,这可以使用现有的运算符编写吗?我对如何重置状态有点困惑,尽管也许我需要创建两个嵌套运算符,一个是一次性的,另一个是不断创建一次性的...我知道可以通过编写来完成一个手,但是我需要写我自己的出版商等等。

谢谢!

0 投票
0 回答
24 浏览

scala - RxScala 在工作表中和编译后的行为不同

我在 scala 工作表中玩弄 RxScala 和 Subject 。但是奇怪的事情发生了。 在此处输入图像描述

正如我们所看到的,订阅 c 也获取数字并输出它们。之后,我发现订阅 c 是 var 而不是 val。所以,我改变了声明。并获得关注 在此处输入图像描述

这个是对的。所以,我将这些代码复制到一个主函数并运行它。有趣的是,无论 c 是否为 val,输出都与第二张图片相同。

为什么会发生这些?即使使用相同的代码,在工作表中运行的程序与编译后的程序也不同?

0 投票
0 回答
52 浏览

rx-scala - 在 RxScala 中组合 Observables

我想知道是否有人可以在这里给我一些提示。我正在学习 RxScala,我有以下练习要做: - 实现一个可观察的对象,它每 5 秒和每 12 秒发出一个事件

我想知道下面的代码是否可以做到?还没有找到很多关于 OBservable 组合器的文档

谢谢和问候马可

0 投票
2 回答
452 浏览

exception-handling - 将大图中的 [rx] 可观察到的异常跟踪到源代码

当你有一个大的 Observable 图(即 observable 多次使用merge,groupBy等组成join)并且抛出异常时,有时很难弄清楚异常的来源。我想知道是否有可能找出在源文件中调用 Observable 运算符的位置。一个例子应该更清楚地说明这一点。

例如,给定以下内容IllegalStateException: Only one subscriber allowed!和堆栈跟踪,我想知道是否有可能找出我的源文件中调用的 line numberoperatorMerge等。operatorFilteroperatorGroupBy是否可以通过使用调试器、打印语句或其他方式以某种方式做到这一点?

之所以出现这个问题,主要是因为 Observable 的全部意义在于在执行时将 a) 代码与 b) 解耦。但是对于调试程序来说,这是一场噩梦。因此,重复我上面的问题,我想知道是否可以将每个组合追溯到源代码中的原始行。

0 投票
2 回答
303 浏览

scala - 可观察的异常处理

我正在学习 RxScala 并来到这个非常合成的片段。我正在尝试处理 onError 块中的异常:

万一出现异常,我希望是这样的:

但我得到的是:

我错过了什么?我应该在观察者处“手动”调用 onError 吗?感谢任何帮助。

0 投票
1 回答
151 浏览

scala - 如何手动更新 observable?

我是 reactivex 和rxscala的新手,可以创建Observable这样的:

我可以将新字符串subscriber放入Observable.apply.

observable外面可以更新吗?我的意思是,有没有类似的方法putNext

让我把新东西放到现有的 observable 上?

0 投票
0 回答
116 浏览

reactive-programming - 如何从 PublishSubject 获取历史数据?

如何从 a 中获取所有历史数据PublishSubject

它打印:

你可以看到没有b: 1打印。

如果我必须使用PublishSubject(因为我需要从多个地方更新一个 observable),我怎样才能确保以后的订阅者也可以获得所有的历史数据?