问题标签 [reactivex]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
228 浏览

c# - 使用 IEventProcessor 注册观察者

这是取自此处的 IEventProcessor 实现的一部分:

将新事件添加到 EventHub 时,会调用 ProcessEventsAsync 方法,并且可以使用 foreach 循环来处理这些事件。我现在想使用例如此处讨论的 ObserverRegistry 将观察者添加到 SimpleEventProcessor 。建议的 ObserverRegistry 如下所示:

不幸的是,缺少一些东西。如何使用 SimpleEventProcessor 注册多个观察者,以便将事件从 ProcessEventsAsync 传递给所有观察者并最终传递给它们的 When 方法?

0 投票
1 回答
2690 浏览

swift - RxSwift:仅在满足要求时才压缩 Observables

想象一个允许编辑姓名姓氏年龄头像图像等字段的用户配置文件。当用户单击Save时,会针对已更改的每个值发送请求。

现在请考虑具有以下签名的函数:

通常我们只会zip像这样的请求:

当我只想对已更改的值执行请求时,我应该怎么做?

0 投票
1 回答
1238 浏览

java - Rx java OutOfMemory

编辑:看到这个更清晰和准确的问题: RxJava flatMap and backpressure 奇怪的行为

我目前正在使用 RxJava 编写数据同步作业,而且我对反应式编程和特别是 RxJava 库非常陌生。

我的工作很简单,我有一个元素 ID 列表,我调用一个 Web 服务按 ID 获取每个元素,进行一些处理并进行多次调用以将数据推送到数据库。

我使用 1 个 io 线程从 WS 加载数据,并使用多个 io 线程将数据推送到数据库。但是我总是以 OutOfMemory 错误告终。我首先认为从 WS 加载数据比将它们存储在数据库中要快。

但是作为 WS 调用和 DB 调用的同步调用,它们是否应该相互施加背压?

谢谢您的帮助。

我的代码几乎是这样的:

0 投票
1 回答
783 浏览

java - RxJava flatMap 和背压的奇怪行为

在使用 RxJava 编写数据同步作业时,我发现了一个我无法解释的奇怪行为。我对 RxJava 很陌生,希望能得到帮助。

简而言之,我的工作非常简单,我有一个元素 ID 列表,我调用一个 Web 服务来按 ID 获取每个元素,进行一些处理并进行多次调用以将数据推送到数据库。数据加载比数据存储快,所以我遇到了 OutOfMemory 错误。

我的代码看起来很像“失败”测试,但随后做了一些测试,我意识到删除该行:

让它起作用。失败的测试输出清楚地表明未消耗的项目堆积起来,这会导致 OutOfMemory。工作测试输出显示生产者将始终等待消费者,因此这永远不会导致 OutOfMemory。

这种行为背后的解释是什么?我如何在不删除 Observable.just(dt)) 的情况下解决我的失败测试,​​在我的真实情况下是 Observable.from(someListOfItme)

0 投票
1 回答
175 浏览

c# - 将工作推给观察者

我有一个监听器,它以 IPayload 的形式接收工作。听众应该把这项工作推给真正做这项工作的观察者。这是我第一次粗略地尝试实现这一目标:

这是遵循观察者/可观察模式(即pub sub?)的有效方法吗?我的理解是,NotifyObservers 也会为每个有效负载带来威胁。这个对吗?非常欢迎任何改进建议。

请注意,所有观察者都必须在以有效载荷的形式将新工作传递给他们之前完成他们的工作——“观察”的顺序无关紧要。基本上,侦听器必须像大师一样发挥作用,同时尽可能多地使用 TPL 来利用主机的内核。恕我直言,这需要向侦听器/可观察对象显式注册观察者。

PS:

我认为 Parallel.ForEach 不会为每个观察者创建一个线程:为什么 Parallel.ForEach 不运行多个线程?如果这是真的,我如何确保为每个观察者创建一个线程?

我想到的另一种选择是:

0 投票
1 回答
542 浏览

javascript - RXJS 和 Angular 1

我正在尝试使用 RxJS 和 Angular 1。我遇到了一个我无法调用或.done()请求的.fail()问题。是否可以将 RxJS 与Angular 1 中的服务一起使用?.always()$http$http

到目前为止,这就是我设置可观察对象的方式:

但是我遇到了一个$http.get.done不是函数的错误。我想在 Angular 1.4 中使用 RxJS 实现的目标是什么?

0 投票
1 回答
497 浏览

system.reactive - RX 故障的解决方法?

我正在各种平台上尝试响应式扩展,而让我有点恼火的一件事是故障。

即使对于 UI 代码,这些故障可能不是那么成问题,并且通常可以找到解决它们的操作符,但我仍然发现在存在故障的情况下调试代码更加困难:中间结果对于调试并不重要,但我的想法不知道结果何时是中间或“最终”。

在 Haskell 和同步数据流系统中使用纯功能 FRP 进行了一些工作后,它也“感觉”错了,但这当然是主观的。

但是当将 RX 连接到非 UI 执行器(如电机或开关)时,我认为故障问题更大。如何确保只有正确的值被发送到外部执行器?

也许这可以通过一些知道一些“外部传感器”何时触发启动事件的“调度程序”来解决,以便在将最终结果转发给执行器之前处理所有内部事件。类似于flajax论文中描述的东西。

我希望得到答案的问题是:

  1. RX 中是否存在无法修复同步通知故障的问题?
  2. 如果没有,RX 是否存在(最好是生产质量)库或方法来修复同步故障?特别是对于单线程 Javascript,这可能有意义吗?
  3. 如果不存在通用解决方案,如何使用 RX 来控制外部传感器/执行器而不会在执行器上出现故障?

让我举个例子

假设我想打印(a,b)合同所在的元组序列

n 是自然数流 = 0,1,2....

所以我期望以下序列

在 RX 中,为了让事情更有趣,我将使用过滤器来计算 b 流

这给出了我认为是一个小故障(暂时违反不变量/合同):

我意识到这是 CombineLatest 的正确行为,但我也认为这被称为故障,因为在真正的纯 FRP 系统中,您不会得到这些违反中间不变性的结果。

请注意,在此示例中,我将无法使用 Zip,而且 WithLatestFrom 也会给出不正确的结果。

当然,我可以将这个例子简化为一个单子计算,从不多播 n 个流事件(这意味着不能过滤而只能映射),但这不是重点:在 RX 中的 IMO 你总是得到一个“故障” ' 每当您拆分并重新加入可观察流时:

例如,在 FlapJAX 中,您不会遇到这些问题。

这有什么意义吗?

非常感谢,彼得

0 投票
1 回答
612 浏览

pagination - Angular2 和 ReactiveX 的分页思路

我正在学习 ReactiveX。我已经剪掉了错误检查、日志记录和其他部分,以使其更易于阅读。

我有一个以 JSON 形式返回对象集合的服务:

我的组件调用服务方法并将数据存储在一个数组中:

目标是在我的模板中分组显示这些数据:

现在我想添加分页并一次只显示三个或四个面板。

我的第一个想法是使用bufferCount来分组发出对象:

现在我有一个多维数组,所以我必须相应地更新组件:

我想我会有一个漂亮整洁的数组,每个索引都有三个集合成员。我错了,整个集合现在存储在data[0]中。接下来,我尝试将链向上切换一点:

。在这一点上,我显然需要有人来拯救我。看看我的 lambda!此时数据甚至不会一路流回组件。这是我开始思考的时候,也许我不需要学习如何使用 ReactiveX 方式来做到这一点……。

我的下一门课程是尝试使用 Angular 遍历这些值。我尝试在切片管道中使用一些变量:

尽管 Angular 2 仍处于测试阶段,但当解析器不断对我咆哮,因为我在不属于它们的地方使用运算符和表达式时,我可以看出我已经厌倦了。

我已经准备好从这些错误中吸取教训,这样我就可以做出更大的错误。有什么建议么?

[编辑]

我决定使用 ng2-pagination 因为它完全符合我的要求。但是,我不会将其作为答案发布,因为我仍然想尝试使用 rxjs 来实现它。

因此,如果您已经走到了这一步,并且只需要一些有效的东西,那么 ng2-pagination(在撰写本文时处于 beta 2 中)效果很好。

0 投票
2 回答
2513 浏览

reactive-programming - ReactiveX 被认为是反应式编程吗?

ReactiveX 介绍页面

它有时被称为“函数式反应式编程”,但这是用词不当。ReactiveX 可能是函数式的,也可能是响应式的,但“函数式响应式编程”是另一种动物。主要区别之一是函数式反应式编程对随时间连续变化的值进行操作,而 ReactiveX 对随时间发出的离散值进行操作。

同时,从Wikipedia 的功能响应式编程页面,ReactiveX 列在“实现”部分:

实现[编辑]

  • cellx,JavaScript 反应性的超快速实现
  • Elm,可编译为 HTML、CSS 和 JavaScript 的 FRP 语言
  • Frappuccino FRP 在 Ruby 中的实现
  • Flapjax,JavaScript 中的行为/事件 FRP 实现
  • Reactive.jl,Julia 中的 FRP 实现
  • ReactiveX、FRP 以多种语言实现,包括 Java、JavaScript、Python、Swift 等等
  • Haskell 中的响应式香蕉 FRP 实现
  • 用 Swift 和 Objective-C 实现的 ReactiveCocoa FRP
  • 用纯 Swift 实现的 ReactiveKit FRP
  • Haskell 中的 Reflex FRP 实现
  • Scala(和 Scala.js)中的 Scala.Rx FRP 实现
  • Sodium、C#、C++、Haskell(已弃用 [12])、Java、> Rust 和 Scala 中的 FRP 实现
  • Haskell中的Yampa FRP实现

我很了解 ReactiveX 是做什么的,也对“反应式编程”和“函数式反应式编程”做了一些研究,但我仍然无法区分它们之间的关系。

事实上,我有点相信 Wikipedia 页面用词不当,或者错误地在“实现”部分列出了示例,因为我知道cellxReactiveX(都在示例中列出)是为了解决完全不同的问题而构建的。

0 投票
2 回答
4285 浏览

rxjs - RXJS:聚合去抖动

我的用例如下:我收到事件,有时会突然发生。如果发生突发,我只需要处理一次。Debounce 做到了这一点。

然而,去抖动只给了我一个爆发的最后一个元素,但我需要知道一个爆发中的所有元素才能聚合它们(使用平面图)。

这可以通过定时窗口或缓冲区来完成,但是,这些是固定间隔,因此缓冲区/窗口超时可能发生在突发中间,因此将突发分成 2 部分来处理而不是 1 部分。

所以我想要的是