问题标签 [rx-scala]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
462 浏览

scala - 超时终止 Observable

我试图通过超时限制可观察的寿命:

我想区分以下结果:

  1. Observable 超时结束,没有得到结果
  2. 执行过程中抛出异常
  3. 执行成功,返回值

我可以在部分 onNext 和 onError 中毫无问题地处理案例 2 和 3,但是如何检测 observable 是否因超时完成?

还有一件事:尽管我的代码中有对 obeserver.onCompleted() 的调用,但我从来没有进入 block onComplete。为什么?

0 投票
1 回答
37 浏览

java - 如何用自定义数字和不同的延迟构造一个 Observable?

为了用 rxscala 测试我的反应程序,我需要构造这样一个Observable

哪一个

  1. 公布号码1
  2. 然后等待1s
  3. 公布号码4
  4. 然后等待3s
  5. 公布号码2
  6. 然后等待2s

我有一个丑陋的解决方案,使用Threadand ReplaySubject

有没有更好的解决方案?

0 投票
1 回答
146 浏览

scala - `interval`上的`delay`会抛出`NoSuchElementException`?

一些简单的rxscala代码:

运行它时,它会抛出异常:

哪里错了?

0 投票
1 回答
143 浏览

scala - Scala 未来派生的可观察回调没有被调用

使用 scala 2.11.7、rxscala_2.11 0.25.0、rxjava 1.0.16,我的oddFutures回调不会被调用AsyncDisjointedChunkMultiprocessing.process()

我创建Observable.from(Future)in的方式有什么问题Repository.query()吗?

0 投票
3 回答
984 浏览

scala - 如何在 RxScala/Java 中使用多个线程执行 map、filter、flatMap?

如何运行filtermap以及flatMap使用Observable多个线程:

目标是使用多个线程同时运行过滤和转换操作。

0 投票
1 回答
78 浏览

scala - Scala Observable Creation 阻止了我的未来

我想处理 ea。异步查询获取(每个查询可能多次获取)。为了做到这一点,我将处理函数(返回 a Future)传递给我的查询方法,以便为 ea 调用它。拿来。我事先不知道查询的结果大小;我只知道我提取的最大尺寸。因此,我的查询返回一个Observable(而不是一个Listfor ex。我需要事先知道大小)。唯一的问题是,当我使用Observable createor时apply,它会在内部阻塞,直到 myFuture完成,然后再调用下一个 onNext ——实际上,消除了我希望从期货中获得的性能提升。Observable from工厂方法不会阻塞,但它需要一个Iterable. 我可以传递一个可变的Iterable并随着新的获取而增长。有人有一个更具参考性的透明sol'n?这是代码:

0 投票
0 回答
260 浏览

reactive-programming - 如何从可观察方取消订阅所有观察者?

使用 rxscala,我们可以像这样订阅 observables:

是否可以从侧面取消订阅所有观察者(有 4 个)stream

0 投票
1 回答
36 浏览

reactive-programming - 为什么 `collect` 让观察者没有收到通知?

我正在使用 rxscala 并发现了一个非常微妙的问题,我的代码简化为以下内容:

关键点是!!!(2)线,它在projects自身内部。

它打印以下内容:

问题是没有### 222:线!

但是,如果我更改collect部分并添加None案例:

它将### 222:按我的预期打印行:

我不明白为什么。

PS:您可以在此处克隆代码:https ://github.com/freewind/rxscala-test/blob/master/src/main/scala/myrx/SubtleBug.scala

0 投票
1 回答
43 浏览

rx-scala - 在 RxJava 和 RxScala 中转换 Observable

我有一个 Observable,它发出如下条目列表:

我现在怎样才能将内容通过管道传输到另一个接收 MyEntry 的 Observable 中?我的意思是,对于原始 Observable 中的每个条目,我需要将它们传送到另一个具有以下类型签名的 Observable:

0 投票
1 回答
75 浏览

scala - Scala 中的反应式扩展 (Rx) - 在给定时间间隔后执行方法

由于我对 Reactive Extensions 很陌生,所以我对以下事情感到好奇。

通过在 Scala 中使用 Rx,我希望能够调用每秒从 API 检索内容的方法。

到目前为止,我已经了解了 Rx 中使用的创建运算符,例如 Interval、Timer 等。但不幸的是,我无法提出正确的解决方案。

有没有人有这方面的经验,最好是分享代码示例?

提前致谢!