问题标签 [reactivex]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
62 浏览

rx-java - 稍后在其树中获取 Observable 的初始发射

Observable<String> myObservable给定一个以不规则间隔发出值的热点。我希望能够根据我想要flatMap的结果获得初始值。例如,考虑下面的代码:obs1 Observableobs1flatMapobs2myObservable

一个解决方案是将其存储myObservable在一个变量中并在第二个获得其最新值,flatMap但我也无法实现这一点,所以我正在寻找一个更优雅的解决方案。谢谢你。

0 投票
1 回答
976 浏览

python - 如何从 Rx Observable 中提取值?

我正在尝试将一些 ReactiveX 概念集成到现有项目中,认为这可能是一种很好的做法,也是一种使某些任务更清洁的方法。

我打开一个文件,从它的行创建一个 Observable,然后进行一些过滤,直到我得到我想要的行。现在,我想使用 re.search() 从其中两行中提取一些信息以返回特定组。我一生都无法弄清楚如何从 Observable 中获取这些值(不将它们分配给全局变量)。

代替.subscribe()该流的末尾,我尝试使用.to_list()来获取一个列表,我可以在该列表上迭代“正常方式”,但它只返回一个类型的值:

<class 'rx.anonymousobservable.AnonymousObservable'>

我在这里做错了什么?

我见过的每个 Rx 示例都只打印结果。如果我希望它们在我可以同步使用的数据结构中怎么办?

0 投票
1 回答
488 浏览

typescript - 如何将 Observable 的 forkJoin 与自己的事件一起使用

我在我的 angular2 应用程序中使用响应式主题来表示事件。

当我做这样的事情时:

一切正常,但我想等待两个事件触发,然后执行一些操作。我找到了 Observable.forkJoin 但我不能让它与主题一起使用。像这样的代码不起作用

你能帮我解决这个问题吗?

最好的问候 Krzysztof Szewczyk

0 投票
4 回答
3280 浏览

angular - 如何将对象传递给 rxjs subscribe() 回调函数?

我正在开发一个 Ionic2 应用程序,使用 cordova-plugin-network-information,我从我的 app.ts 订阅连接和断开事件,并希望能够将我的 NavController 和加载组件的引用传递给订阅() 回调,因此每当断开连接的事件触发时,我都可以在 UI 顶部向用户显示加载覆盖。我看到在回调中对“this”对象的引用更改为一个名为“SafeSubscriber”的对象,我认为这是它的 Observer 对象的 rxjs 类型类,我在这里遇到的问题是我没有办法得到那些app.ts 中可用的实例可用于回调内的此代码,使用 Chrome DevTools 我也无法找到脱离此上下文的方法以访问 App 对象本身。

这是我的代码:

这是我在 Chrome DevTools 中查询“this”对象时得到的结果(这应该将其原始上下文保留在 lambda [fat arrow] 函数中,这是正确的吗?)

在此处输入图像描述

我已经尝试在订阅之前设置一个“那个”对象,以便变量“this”不会干扰回调“this”范围,它在这种情况下不起作用,因为“那个”是立即声明的在触发断开事件时,在回调内部未定义 subscribe() (let that: any = this;) 之前。

我知道这不是放置直接更改 UI 的代码的最佳位置,但我看不到其他位置,因为我需要的是一个全局事件处理程序,它通过在未检测到连接和用户时设置此覆盖来工作正在查看应用程序中的某些页面。

我认为应该有一个非常简单和优雅的解决方案,但我似乎无法找到它。有没有办法将参数传递给 subscribe() 函数?某种带有我需要的引用的对象?

提前致谢。

0 投票
2 回答
66 浏览

system.reactive - ReactiveX 与多个消费者一起重试

快速提问,因为我觉得我一定错过了什么。我在这里使用 rxjs 是因为它是我面前的东西,我相信这是一个一般的 reactiveX 问题。

假设我有一组像这样的 Observable:

所以,这工作正常。不过有问题。有时连接失败。

所以,如果我这样做event_stream.retry(),效果很好。当它失败时,它会重做网络调用并获得一个新v的用于建立新连接。

问题

如果我想把两件东西锁在我的network_request. 也许我希望用户界面在每次网络调用完成时做一些事情,比如v在用户界面中显示一些东西?

我可以:

如果我不这样做,share那么它会发出两个请求,这不是我想要的,但是shareevent_stream稍后出现错误时,它不会重试网络请求,因为 refcount 仍然为 1(由于ui_stream) ,所以它立即返回完成。

我想要的是

这显然是我为了解释我的困惑而编造的一个小例子。我想要的是,每次event_stream(长期连接)的结果出现错误时,都会发生以下所有情况:

  1. 再次发出网络请求
  2. 该请求的新响应用于建立新连接并event_stream继续新​​事件,就像什么都没发生一样
  3. 也会发出相同的响应ui_stream以进行进一步处理

这感觉不是一件复杂的事情,所以在拆分/扇出 RX 事物时,我一定只是误解了一些基本的东西。

我认为我可以做但想避免的解决方法

我正在寻找导出这些 observables,所以我不能只是重新构建它们然后说“嘿,这是新事物”。我希望event_stream所有下游处理都不知道有断开连接。对ui_stream. 它刚刚获得了新的价值。

我可能可以使用 aSubject作为生成计数器来解决一些问题,每次我希望一切重新启动时都会 ping 通,然后基于此将其network_request放入 aflatMap中,这样我就可以打破share......但这感觉就像一个非常hacky的解决方案,所以我觉得必须有比这更好的方法。

我从根本上误解了什么?

0 投票
1 回答
645 浏览

java - RxJava:void observable 的 zip 运算符线程中断错误

zip 运算符在两个 Observable 的情况下如何工作?对我来说,当一个完成时它会打断另一个。除了 void 之外,它会等待每个 observable 完成,然后返回结果。

代码是

我正在使用 cloudinary uploader().destroy() 删除图像,如果成功则返回 Void 否则异常/错误。

它立即执行 firebase 查询,并在成功时中断 image delete observable。

这是正确的行为吗?还是我的代码中有错误?

0 投票
1 回答
531 浏览

java - RxJava retryWhen 和 onError

我开始使用 rxJava 进行开发,我有以下情况:

我有一个 observable,我想让一个订阅者处理错误onError,而另一个订阅者处理retryWhen.

重试时吞下错误,如何避免吞下错误?

0 投票
1 回答
213 浏览

retrofit - 在另一个可观察对象的操作中包装可观察对象

我刚开始使用 ReactiveX 和 Retrofit,请考虑以下示例改造示例,

在另一个 action1 中观察是个好主意吗?像下面

0 投票
0 回答
271 浏览

java - 发出自发光项目列表的 RxJava 最佳实践

我正在使用 RxJava 在 Android 中构建反应式 UI。假设我有一个ListViewRecyclerView显示用户列表。我希望能够为我的适配器提供整个项目列表,每个项目都处于它们的最新状态,只要它们中的任何一个被更新。

这是我目前对该想法的实现:

  1. 为每个用户创建一个Observable,它是一些热点的下游Observable。所以只要有变化,它就会被热点触发,Observable进而发出最新的用户数据。
  2. 用于Observable.combineLatest创建一个合并点,我将在其中将用户组合成一个列表
  3. 将列表发送到 UI

我能够让它工作,但有时当列表包含大量用户(即 1000 个用户)时,或者当我因为删除/插入(因此需要combineLatest再次调用)而需要动态更新列表时,我观察到之间存在明显的滞后发生更新并最终发出列表。

所以我的问题是,实现这种自我更新项目列表并作为一个整体发出的最佳和最有效的方法是什么?还是我根本不应该以列表的形式发出?

0 投票
1 回答
323 浏览

ios - RxSwift:立即接收事件,除非最后一个事件在一定的时间间隔内被处理

RxSwift / Reactivex 的新手。基本上,我要做的是在发生任何事情时进行服务器调用,但要确保它的频率不超过每 10 秒一次。如果可能,减少频率。

例如,每当生成事件(“需要更新”)时,如果自上次调用以来已超过 10 秒,我想立即调用服务器。如果时间过去了,我想在最后一次通话后的 10 秒内拨打电话。在这 10 秒内生成了多少事件并不重要。

我查看了 的描述,throttle但如果事件发生得很快,它似乎会饿死,这是不可取的。

我怎样才能做到这一点?