问题标签 [rx-scala]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
434 浏览

rx-java - 将 RxJava Observable 转换为 RxScala Observable

我正在用 Scala 编写一个小型 JavaFx 应用程序,我喜欢为此使用 RxScala 和 RxJavaFx。使用以下代码,我得到一个RxJava Observable

显然,当我使用 Scala 编写代码时,我希望使用RxScala Observable来代替。现在我找到了这个帖子,说我要么要导入隐式转换类,JavaConversions._要么直接调用toScalaObservable。但是,第一个选项不适合我,第二个选项看起来有点难看:

选项1:

这不起作用,因为 RxJava 也提供了一个map运算符,尽管参数的类型在技术上是不同的。

选项 2:

这行得通,但老实说,这看起来真的很可怕!

问题 1:我是否遗漏了什么,或者这些真的是仅有的两个选择吗?

我的意思是,要将 Java 的标准集合转换为 Scala 的集合(反之亦然),您有两个选择:JavaConvertersJavaConversions. 前者引入了关键字asJava/ asScala,而后者相当于rx.lang.scala.JavaConversions隐式地为您进行转换(如果您幸运的话!)。谷歌搜索这两者的偏好,很明显,大多数人更喜欢更显式的隐式转换 from JavaConverterswithasScala关键字进行更显式的隐式转换。

问题 2:是否有类似JavaConvertersRxScala 的构造?这将使上面的代码更加简洁,我想说:

0 投票
1 回答
287 浏览

rx-java - 基于 RxJava 2.0 的 RxScala

RxJava 2.0 即将完成(RC3)。是否有基于此版本的(实验性)RxScala 实现?RxJava 2.0 以 Java 8 为目标。这对 RxScala 是否有任何影响,例如 Scala 2.12 是否会成为先决条件?

0 投票
1 回答
317 浏览

scala - Akka Streams - 结合最新的操作

我想将最新的与 Akka Streams 结合起来,如此所述。

我不知道该怎么做-请帮忙!

谢谢,瑞恩。

0 投票
1 回答
374 浏览

asynchronous - Observer.onNext 中的 RxJava/RxScala 异步代码

假设您要将来自某个流的事件存储到数据库中,并且该数据库的客户端库是异步的。

例如,当发出下一个事件时writeEvent(event: MyEvent): Future[Boolean],您必须在观察者内部调用一个方法。onNext除了阻塞之外,还有什么好方法可以做到这一点Future

我目前看到的关于如何实现这一点的唯一方法是创建一些自定义Scheduler,允许我将线程返回到池中,直到内部onNext的异步代码完成。

0 投票
1 回答
47 浏览

scala - 基于资源稀缺性的背压可观测

在 RxJava 1 / RxScala 中,如何在以下情况下限制/背压可观察的源?

一个可能的解决方案就是阻塞直到。哪个有效,但这非常不雅,并且会阻止多个同时请求:

0 投票
2 回答
398 浏览

rx-java - 如何创建两个排序后的可观察对象之间差异的可观察对象?

假设我有两个可观察的排序双精度数。我想将它们之间的区别作为可观察的。例如:

这个命令式的实现很简单:在你还没有到达的一侧保留一个项目列表,并从另一侧“发出”这些项目。

在 RFP 的世界中,对此的规范方法是什么?我专门使用 RxScala。

0 投票
1 回答
170 浏览

scala - 将 TCP 套接字转为 Observable 的 Array[Byte]

在我的 Android 应用程序中,我需要使用 aSocket来发送和接收字节数组。为方便起见,我想使用Observable连接到Socket.

在互联网上我发现了这个代码:

它可以工作,但一次输出一个字符,例如当发送一个“hello there”字符串时,输出是:

但我想在订阅中接收缓冲的字节数组。我怎样才能做到这一点?

0 投票
1 回答
71 浏览

scala - 重启连接到资源的 Observable

在以下代码中,我将 TCP 套接字转换为Observable[Array[Byte]]

与远程服务器的连接可能随时中断,在这种情况下,我想Observable尝试自动重新连接但s.retry不执行任何操作。我怎样才能做到这一点?也可以在当前“内部”完成Observable而不创建新的并重新订阅吗?

0 投票
1 回答
84 浏览

java - 从未调用 RxScala ConnectableObservable.doOnSubscribe?

doSomething永远不会被调用。RxJava 的完全相同的代码可以正常工作。无论出于何种原因,它似乎从未传播到底层的 Java Observable

更新:所以我的解决方法是

0 投票
1 回答
1220 浏览

scala - 在 RxJava/RxScala 中结合 groupBy 和 flatMap(maxConcurrent, ...)

我有传入的处理请求,由于耗尽共享资源,我不希望同时处理太多。我也希望共享一些唯一键的请求不要同时执行:

但是,上述方法不起作用,因为每个键的可观察对象永远不会完成。实现这一目标的正确方法是什么?

什么不起作用: