问题标签 [rx-scala]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
reactive-programming - 如何观察反向依赖顺序?
我想以反向依赖顺序观察对象树,但不知道要使用什么组合器。具体情况是遍历 AWS 资源以便删除,例如在删除 S3 存储桶之前删除 S3 对象,如下所示:
所以,最终的combined
observable 应该在发射桶之前发射所有的对象。我应该为此使用什么组合器?
multithreading - RxJava:订阅后阻塞可观察对象?
我特别需要一个可观察的。
通常,我的 observables 在不同的线程中运行。但是,有时他们需要在订阅过程中阻塞另一个线程。未来的行为方式。
一个例子:
突然,另一个线程上发生了一个事件,该事件向当前线程发出信号,表明它应该等待订阅的执行s
。(一个例子是 Android 的onPause
.)
我怎么做?我如何等待订阅s
,甚至可能检索所有结果?
(主题?)
rx-scala - 如何导致压缩两个 Observable 的背压异常?
我正在尝试编写一个测试来查看 onBackPressureDrop 在 RxScala 中的使用。
我正在用一个简单的压缩功能压缩一个快速的 Obserable 和一个慢速的 Obserable。
奇怪的是,RxJava 中的相同示例会产生异常,但使用 RxScala 似乎不需要 onBackPressureDrop 指令。
测试如下所示:
由于没有背压,我怎样才能使这段代码失败?
scala - 在 Rx(或 RxJava/RxScala)中,如何制作一个自动重置状态锁存映射/过滤器来测量流内经过的时间来触摸障碍?
抱歉,如果问题措辞不佳,我会尽力而为。
如果我有一个以时间为单位的值序列,Observable[(U,T)]
其中 U 是一个值,T 是类时间类型(或者我想的任何差异),我怎么能编写一个操作符,它是一个自动重置的一键式屏障,它在 时是无声的abs(u_n - u_reset) < barrier
,但如果碰到屏障就会吐出t_n - t_reset
,此时它也会重置u_reset = u_n
。
也就是说,该运算符接收到的第一个值成为基线,它什么也不发出。此后,它会监视流的值,一旦其中一个值超出基线值(高于或低于),它就会发出经过的时间(由事件的时间戳测量),并重置基线。然后将处理这些时间以形成对波动率的高频估计。
作为参考,我正在尝试编写http://www.amazon.com/Volatility-Trading-CD-ROM-Wiley/dp/0470181990中概述的波动率估计器,而不是测量标准偏差(在常规均匀时间的偏差),您重复测量突破某个固定障碍量的障碍所需的时间。
具体来说,这可以使用现有的运算符编写吗?我对如何重置状态有点困惑,尽管也许我需要创建两个嵌套运算符,一个是一次性的,另一个是不断创建一次性的...我知道可以通过编写来完成一个手,但是我需要写我自己的出版商等等。
谢谢!
rx-scala - 在 RxScala 中组合 Observables
我想知道是否有人可以在这里给我一些提示。我正在学习 RxScala,我有以下练习要做: - 实现一个可观察的对象,它每 5 秒和每 12 秒发出一个事件
我想知道下面的代码是否可以做到?还没有找到很多关于 OBservable 组合器的文档
谢谢和问候马可
exception-handling - 将大图中的 [rx] 可观察到的异常跟踪到源代码
当你有一个大的 Observable 图(即 observable 多次使用merge
,groupBy
等组成join
)并且抛出异常时,有时很难弄清楚异常的来源。我想知道是否有可能找出在源文件中调用 Observable 运算符的位置。一个例子应该更清楚地说明这一点。
例如,给定以下内容IllegalStateException: Only one subscriber allowed!
和堆栈跟踪,我想知道是否有可能找出我的源文件中调用的 line numberoperatorMerge
等。operatorFilter
operatorGroupBy
是否可以通过使用调试器、打印语句或其他方式以某种方式做到这一点?
之所以出现这个问题,主要是因为 Observable 的全部意义在于在执行时将 a) 代码与 b) 解耦。但是对于调试程序来说,这是一场噩梦。因此,重复我上面的问题,我想知道是否可以将每个组合追溯到源代码中的原始行。
scala - 可观察的异常处理
我正在学习 RxScala 并来到这个非常合成的片段。我正在尝试处理 onError 块中的异常:
万一出现异常,我希望是这样的:
但我得到的是:
我错过了什么?我应该在观察者处“手动”调用 onError 吗?感谢任何帮助。
scala - 如何手动更新 observable?
我是 reactivex 和rxscala的新手,可以创建Observable
这样的:
我可以将新字符串subscriber
放入Observable.apply
.
observable
外面可以更新吗?我的意思是,有没有类似的方法putNext
:
让我把新东西放到现有的 observable 上?
reactive-programming - 如何从 PublishSubject 获取历史数据?
如何从 a 中获取所有历史数据PublishSubject
?
它打印:
你可以看到没有b: 1
打印。
如果我必须使用PublishSubject
(因为我需要从多个地方更新一个 observable),我怎样才能确保以后的订阅者也可以获得所有的历史数据?