问题标签 [reactive-extensions-js]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1364 浏览

javascript - rx:将数组展开到多个流

我有一个包含一个数组的流,其中每个元素都有一个 id。我需要将其拆分为每个 id 的流,当源流不再携带 id 时,该流将完成。

例如具有这三个值的输入流序列

应该返回三个流

其中 a 在第 3 个值完成,因为它的 id 已经消失,而 c 在第 2 个值创建,因为它的 id 已经出现。

我正在尝试 groupByUntil,有点像

因此,按 id 分组,并在输入流不再具有 id 时处理组。这似乎可行,但输入的两种用途对我来说看起来很奇怪,就像使用单个流来控制 groupByUntil 的输入和组的处置时可能存在奇怪的顺序依赖性。

有没有更好的办法?

更新

确实,这里有一个奇怪的时间问题。fromArray 默认使用 currentThread 调度程序,这将导致来自该数组的事件与来自输入的事件交错。然后在错误的时间(在处理来自先前输入的组之前)评估组上的处置条件。

一种可能的解决方法是使用 fromArray(.., rx.Scheduler.immediate),这将使分组事件与输入保持同步。

0 投票
1 回答
2732 浏览

javascript - 如何构建 rxjs 代码

如何构建一个 rxjs 应用程序?大约有一百个玩具介绍示例,但没有一个完整的应用程序示例,带有小部件、子小部件等,显示整个应用程序的数据流。

例如,假设您有一个带有某种状态的可观察对象。您需要将其传递给小部件。该小部件具有需要该状态部分的子小部件。你订阅了吗?

现在“小部件”在单子之外。子小部件不能对状态使用可观察的方法。如果您将小部件作为副作用运行,则会遇到同样的问题。

那么你是否将流传递给小部件?如果是这样,你会得到什么?

小部件是否订阅状态并返回一次性?它是否返回从状态派生的流?如果有,里面有什么?您是否尝试通过广泛使用 selectMany(identity) 将所有小部件/子小部件/子子小部件中的所有流收集在一起,以获得您订阅的最终应用程序流以启动整个过程?

如果小部件根据状态按需创建子小部件,小部件如何管理其子小部件?我一直在尝试使用 groupBy() 的解决方案,每个子小部件都有一个组,但是管理从嵌套的 observable 返回的所有订阅或流是一个令人难以置信的噩梦。

即使是整个应用程序的一个示例也会有所帮助。

0 投票
1 回答
253 浏览

reactive-extensions-js - 使用可观察对象作为承诺的示例

有没有人有一个例子来说明如何使用 RX observables 来帮助异步代码的外观?

我正在寻找 promises 的替代方案,我想看看它是如何在 rx 中完成的。

0 投票
0 回答
505 浏览

javascript - rx 数据驱动子小部件

跟进如何构建 rxjs 代码,关于在使用 rx 时如何使用子小部件构建小部件,在子小部件由数据驱动的情况下,您将如何构建 rx 代码?

作为一个玩具问题,假设您有一个外部源(例如 Web 服务)流式传输要观看的股票列表,包括价值、高、低等。所以您有一个 Observable 流,例如(时间下降):

您需要为每个股票代码构建一个小部件。所以必须修改上一个问题中的模式。没有一个孩子,而是根据输入流创建/销毁一个孩子,例如,我们在第一个事件中为“csco”创建一个孩子。在第 2 次事件中,“csco”孩子获得更新(低:8),我们为“aapl”创建了一个孩子。

您可以将子小部件创建移动到订阅:

这引入了一个排序问题:孩子没有得到导致它被创建的事件。您可以通过执行类似state.repeat(1), 或的操作来解决此问题state.startsWith(s).select(...),但它看起来有点奇怪。

假设孩子们也在返回溪流。例如,也许用户操纵孩子们来模拟不同的位置,所以我们想从孩子们那里得到一些计算,并在小部件中显示一个总体总数或其他指标。

您是否在 widget() 中创建了一个主题并将来自孩子的流推送到它上面?喜欢

这一切看起来都很笨拙。有没有更好的组织方式?我尝试将其建模为一个流,每个孩子一个,并合并输出。这效果不佳。很难跟踪内部订阅,或者从子级输出多个流。

0 投票
1 回答
1976 浏览

javascript - how to avoid glitches in Rx

Unlike other "FRP" libraries, Rx doesn't prevent glitches: callbacks invoked with time-mismatched data. Is there a good way to work around this?

As an example, imagine that we have a series of expensive computations derived from a single stream (e.g. instead of _.identity, below, we do a sort, or an ajax fetch). We do distinctUntilChanged to avoid recomputing the expensive things.

The second event will end up causing a number of glitchy states: we get three events out, instead of one, which wastes a bunch of cpu and requires us to explicitly work around the mismatched data.

This particular example can be worked around by dropping the distinctUntilChanged, and writing some wonky scan() functions to pass through the previous result if the input hasn't changed. Then you can zip the results, instead of using combineLatest. It's clumsy, but doable.

However if there is asynchrony anywhere, e.g. an ajax call, then zip doesn't work: the ajax call will complete either synchronously (if cached) or asynchronously, so you can't use zip.

Edit

Trying to clarify the desired behavior with a simpler example:

You have two streams, a and b. b depends on a. b is asynchronous, but the browser may cache it, so it can either update independently of a, or at the same time as a. So, a particular event in the browser can cause one of three things: a updates; b updates; both a and b update. The desired behavior is to have a callback (e.g. render method) invoked exactly once in all three cases.

zip does not work, because when a or b fires alone, we get no callback from zip. combineLatest does not work because when a and b fire together we get two callbacks.

0 投票
1 回答
8434 浏览

reactive-extensions-js - 将 RxJS Observable 收集到数组

我想使用 RxJS 将异步事件世界与同步世界“桥接”起来。具体来说,我想创建一个函数,该函数返回在某个时间间隔内收集的事件数组。

我可以创建 Observable 来做我想做的事

我可以打印正确的值就好了

这打印

但我想要的是将此数组分配给变量。从概念上讲,我想要类似的东西

var collectedDuringSecond = source.toPromise.getValue()

这个想法是 getValue 会阻塞,所以在上面的行完成之后,collectedDuringSecond 将包含 [0,1,2,3,4,5,6,7,8]

0 投票
1 回答
325 浏览

reactive-extensions-js - RxJS 可变长度窗口

我想知道如何创建 windowWithMaxCount 的效果,它可以像 windowWithCount 一样工作,但窗口大小会从 1 变为 maxCount。

我正在做的是基于 c 事件流绘制折线图。折线图需要 50 个点的数组。当新点到达时,我需要在右边推出一个点并将这个新点放在左边。

所以一般 observable.windowWithCount(50,1) 就是这样做的。对于第一个窗口,唯一的问题是我必须等到所有 50 个元素都可用。在此期间,用户在屏幕上什么也看不到。

相反,我想要发生的是,一旦第一个点到达,我想获得大小为 1 的窗口,然后是大小为 2 的窗口等,直到我获得大小为 50(maxCount)的窗口。此时所有后续窗口的大小将是 50。

屏幕上的效果是线条从左到右填充屏幕,直到感觉整个屏幕。

0 投票
2 回答
1355 浏览

observable - 创建一个延迟下一个值的 Observable

我正在尝试使用 RxJS 创建一个可观察的对象,它可以执行如图所示的操作。

预期的可观察映射

  • 获取一个值并在获取下一个值之前等待一段固定的时间。
  • 下一个将是等待期间发出的最后一个值,跳过其余的。
  • 如果在没有发出任何值的情况下经过等待间隔,则应立即抓取下一个,如图像的最后一个示例所示。
0 投票
3 回答
1905 浏览

system.reactive - 如何用 observables 定义循环

我正在尝试设置一个简单游戏的更新循环,并考虑到 observables。顶层组件是一个模型,它接受输入命令并产生更新;和一个view,它显示接收到的更新,并产生输入。单独来看,两者都可以正常工作,有问题的部分是将两者放在一起,因为两者都依赖于另一个。

将组件简化为以下内容:

我把事情联系在一起的方式是这样的:

也就是说,我添加一个主题作为输入流的占位符,并将模型附加到该主题。然后,在构建视图之后,我将实际输入传递给占位符主题,从而关闭循环。

然而,我不禁觉得这不是正确的做事方式。为此使用主题似乎是矫枉过正。有没有办法用 publish() 或 defer() 或类似的东西做同样的事情?

更新:这是一个不太抽象的例子来说明我遇到的问题。下面是一个简单“游戏”的代码,玩家需要点击一个目标才能击中它。目标可以出现在左侧或右侧,每当它被击中时,它就会切换到另一侧。看起来很简单,但我仍然觉得我错过了一些东西......

0 投票
1 回答
6309 浏览

javascript - RxJS 在多个输出中拆分可观察序列

是否可以将单个可观察通量拆分为多个其他可观察通量?

我的用例是用户可以提交的表单。提交动作在一个可观察对象中处理,在这个动作上,有一个验证器在监听。

问题是我想将操作绑定到验证器检查的successfailure

我不确定在反应式编程中如何处理类似的情况 - 可能拆分 observable 并不是处理此类问题的正确解决方案。

无论如何,您将如何处理类似的情况?