问题标签 [rx-java]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
2838 浏览

scala - 将 RX Observable 传递给演员(scala)是否安全?

一段时间以来,我一直在为 RX Java 使用 scala 绑定,并且正在考虑将其与 Akka Actors 结合使用。我想知道Observable在 Akka 之间传递 RX 是否安全/可能Actor例如,打印最多 20(每秒)的偶数平方的程序:

(请将此视为伪代码;它不会编译)。请注意,我正在send将 Observables 从一个演员转移到另一个演员。我想了解:

  • Akka 和 RX 会自动同步对 Observable 的访问吗?
  • Observable不能通过分布式系统发送 - 它是对本地内存中对象的引用。但是,它会在本地工作吗?
  • 假设在这个简单的示例中,工作将安排subscribeProducer. 我可以拆分工作,以便分别在每个演员身上完成吗?

题外话:我见过一些将 RX 和 Actor 结合起来的项目:

http://jmhofer.johoop.de/?p=507https://github.com/jmhofer/rxjava-akka

但它们的不同之处在于它们不只是Observable在参与者之间传递消息。他们首先调用subscribe()获取值,然后将它们发送到演员邮箱,并从中创建一个新Observable的。还是我弄错了?

0 投票
1 回答
234 浏览

scala - rxjava 即使在异常之后也会消耗下一个元素

有人可以解释为什么 rxjava 在“onNext”中发生异常之后甚至在调用“onError”之后消耗可观察序列中的下一个元素吗?

这是我的模拟:

其结果如下,您可以看到在引发异常后从序列中取出第 6项:

据我所知,我从常规序列创建的可观察集合是“冷的”,只有在观察者成功处理当前序列后,才能从源序列中获取下一个项目。我可能会怀疑一些线程问题,但这意味着“冷”不是那种不可能是真的“冷”,而且我可以清楚地看到始终使用相同的线程 ID。

那么为什么从序列中取出第6项?!

0 投票
1 回答
1922 浏览

android - 基于 Observable 的 API 和取消订阅问题

我正在尝试使用 Rx-Java 在 Android 上创建一个用于位置跟踪的类。我仍然无法弄清楚如何正确处理我的 Observable 的生命周期。我想要的是一个 Observable,它在第一次订阅发生时开始跟踪位置,并在最后一次订阅被丢弃时停止位置跟踪。到目前为止,我取得的成就是:

如您所见,Observable#defer当第一个客户端订阅时,我使用它来初始化位置回调。我不知道这是否是一个好方法,但这是我目前想出的最好的方法。我仍然缺少的是当我班的最后一个客户取消订阅我的 observable 时如何停止位置更新。或者也许它在 Rx 中是非惯用的,因为它并不明显?

我相信,这个用例应该是相当标准的,因此应该有一个标准/惯用的解决方案。很高兴知道它。

0 投票
1 回答
168 浏览

scala - 处理 groupBy 中嵌套 Observable 的类型擦除

考虑这个(人为的)代码:

它创建了一个 observable,其中前 10 个是CaseOne,接下来是CaseTwo。然后根据它们是偶数还是奇数对它们进行分组。Observable[(Long, Observable[MyCaseClass])]因此,根据 RX-Java 规范,分组的 observable 最终是 type 。

然而,当我们订阅一个 '​​groupedby' observable 时,类型擦除意味着嵌套的 Observable 的签名丢失了,即我们无法判断它是 CaseOne 还是 CaseTwo(它变成了 type Any) - 编译器会对此发出警告,所以输出是

我的问题是,在上述场景中,你如何处理嵌套 Observable 的类型擦除?

到目前为止,我唯一的解决方法是在键中包含一个额外的值,用于标识嵌套的 Observable 类型,然后将(使用asInstance)转换为这种类型。但这不是很好。

还要注意,虽然我没有even在这个例子中使用,但它直接反映了我的问题的结构。

0 投票
4 回答
7002 浏览

rx-java - 使用 RXJava 防止快速点击

我正在编写一个 android 应用程序并使用 rxjava 来处理用户输入事件。基本上我想要做的是,在单击按钮时发出,然后在之后的一段时间内(如一两秒)放弃后续发射,基本上是为了防止必须处理多次单击按钮。

0 投票
3 回答
36677 浏览

java - RxJava:如何组合多个具有依赖关系的 Observable 并在最后收集所有结果?

我正在学习 RxJava,并且作为我的第一个实验,尝试重写此代码中第run()一种方法中的代码(在Netflix 的博客中引用为 RxJava 可以帮助解决的问题)以使用 RxJava 提高其异步性,即它不会f1.get()在继续执行其余代码之前,请等待第一个 Future ( ) 的结果。

f3取决于f1。我知道如何处理这个问题,flatMap似乎可以解决问题:

接下来f4f5取决于f2. 我有这个:

这开始变得很奇怪(merge他们可能不是我想要的......)但最后允许我这样做,而不是我想要的:

这给了我:

这是所有的数字,但不幸的是我在单独的调用中得到了结果,所以我不能完全替换原始代码中的最终 println:

我不明白如何在同一行访问这两个返回值。我认为这里可能缺少一些函数式编程。我怎样才能做到这一点?谢谢。

0 投票
1 回答
2781 浏览

java - rx-java 中的套接字看门狗

我目前正在努力尝试使用 rx 实现 tcp 看门狗/重试系统,非常感谢您的帮助。

有一个 Observable,我希望有一个 Observable,它会定期检查我们是否仍然可以写入套接字。很简单,我可以这样做:

现在,如果可以获取套接字(服务器/链接在创建时关闭)或变得不可写(成功连接后服务器/链接无法访问),我想重试连接。理想情况下,通过重新订阅其 OnSubscribeFunc 使用重试运算符创建连接的套接字 Observable。如您所见,这会在套接字和看门狗 Observables 之间引入循环依赖。我玩弄了一段时间 switchMap/materialize... 为了传播最终的错误无济于事。

我接近放弃这个想法并使用副作用代码中的主题。但是在全球范围内应该有更好的方法:)

提前致谢!

0 投票
4 回答
2913 浏览

reactive-programming - 使用 RxJava 组合具有依赖关系的异步 Observable

I am new to reactive programming and confused about composing observables that have dependencies. Here is the scenario: There are two observables A, B. Observable A depends on a value emitted by B. (Therefore A needs to observe B). Is there a way to create an Observable C that composes A and B, and emits V? I am just looking for pointers in the RxJava documentation.

0 投票
1 回答
943 浏览

multithreading - 什么时候在 RxJava 中创建线程

假设我在Observable上有一堆转换:

除了最后一次调用之外,所有这些操作是否同步flatMap()?还是所有操作都在我告诉它订阅的线程上运行?

0 投票
2 回答
1782 浏览

gradle - FAILURE:构建失败并出现异常,原因是构建 RxJava 时无法下载工件

RxJava 项目的构建失败并出现以下异常的原因可能是什么?