问题标签 [rxjs]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
5243 浏览

javascript - RxJS 中的同步性

我希望以下代码会异步运行:

但事实并非如此。通过大范围的数字需要一段时间,只有在完成后才能恢复执行,您可以在这里尝试代码。

我对何时期望 RxJS 同步或异步运行感到困惑。这取决于使用的方法吗?我之前的想法是,一旦我们进入 Observables/Observer 领域,其中的一切都是异步运行的,类似于 Promise 的工作方式。

0 投票
2 回答
469 浏览

javascript - 在 rx 中创建资源的副作用(反应式扩展)

rx 指南说尽可能避免副作用,如果它们不可避免,则将它们放在 do()(js 中的 doAction)子句中。

然而,在 UI 中一个非常常见的副作用是创建一些资源(比如 <div>)被下游引用(由子小部件)。您必须捕获这些资源的句柄,以便传递它们。例如,如果您有一个数据数组,每个数据都需要一个 div,您将为每个数据创建一个 div,并将这些 div 的句柄传递给子项。

但是 doAction() 会丢弃副作用的返回值,因此您无法捕获已创建对象的句柄。您必须在 select() 中执行副作用。

我看这一切都错了吗?创建的资源是状态,并且具有副作用。你想要流中的状态,但是你不能把它放在流中而不把副作用放在 select() 中,这是禁忌的。

0 投票
11 回答
20419 浏览

javascript - 在 RxJS 中按特定时间量分隔可观察值

在特定时间内产生 Observable 值的最惯用的方法是什么?例如,假设我从一个大数组创建了一个 Observable,我想每 2 秒产生一个值。是最好的结合interval方式selectMany吗?

0 投票
2 回答
8974 浏览

node.js - 试图让我自己的 RxJs 可观察

我正在尝试将现有的 API 转换为与 RxJS 一起使用......对节点来说相当新,对 RxJs 来说非常新,所以请多多包涵。

我有一个现有的 API(getNextMessage),当某些东西可用时,它要么阻塞(异步),要么通过节点样式(err,val)回调返回新项目或错误。

所以它看起来像:

getNextMessage(nodeStyleCompletionCallback);

您可以将 getNextMessage 想象成一个 http 请求,该请求在未来服务器响应时完成,但您确实需要在收到消息后再次调用 getNextMessage 以继续从服务器获取新项目。

所以,为了使它成为一个可观察的集合,我必须让 RxJs 继续调用我的 getNextMessage 函数,直到订阅者被释放();

基本上,我正在尝试创建自己的 RxJs 可观察集合。

问题是:

  1. 我不知道如何让subscriber.dispose() 杀死async.forever
  2. 我可能不应该首先使用 async.forever
  3. 我不确定我是否应该为每条消息“完成” - 那不应该在序列的末尾
  4. 我想最终消除使用 fromNodeCallback 的需要,以拥有一流的 RxJS 可观察
  5. 显然我有点困惑。

希望能得到一点帮助,谢谢!

这是我现有的代码:

0 投票
1 回答
1364 浏览

javascript - rx:将数组展开到多个流

我有一个包含一个数组的流,其中每个元素都有一个 id。我需要将其拆分为每个 id 的流,当源流不再携带 id 时,该流将完成。

例如具有这三个值的输入流序列

应该返回三个流

其中 a 在第 3 个值完成,因为它的 id 已经消失,而 c 在第 2 个值创建,因为它的 id 已经出现。

我正在尝试 groupByUntil,有点像

因此,按 id 分组,并在输入流不再具有 id 时处理组。这似乎可行,但输入的两种用途对我来说看起来很奇怪,就像使用单个流来控制 groupByUntil 的输入和组的处置时可能存在奇怪的顺序依赖性。

有没有更好的办法?

更新

确实,这里有一个奇怪的时间问题。fromArray 默认使用 currentThread 调度程序,这将导致来自该数组的事件与来自输入的事件交错。然后在错误的时间(在处理来自先前输入的组之前)评估组上的处置条件。

一种可能的解决方法是使用 fromArray(.., rx.Scheduler.immediate),这将使分组事件与输入保持同步。

0 投票
3 回答
23821 浏览

javascript - 在 RxJS Observable 的 onNext 中等待异步操作

我有一个以正常方式使用的 RxJS 序列...

但是,在可观察的“onNext”处理程序中,一些操作将同步完成,但其他操作需要异步回调,在处理输入序列中的下一项之前需要等待。

...有点困惑如何做到这一点。有任何想法吗?谢谢!

0 投票
1 回答
2732 浏览

javascript - 如何构建 rxjs 代码

如何构建一个 rxjs 应用程序?大约有一百个玩具介绍示例,但没有一个完整的应用程序示例,带有小部件、子小部件等,显示整个应用程序的数据流。

例如,假设您有一个带有某种状态的可观察对象。您需要将其传递给小部件。该小部件具有需要该状态部分的子小部件。你订阅了吗?

现在“小部件”在单子之外。子小部件不能对状态使用可观察的方法。如果您将小部件作为副作用运行,则会遇到同样的问题。

那么你是否将流传递给小部件?如果是这样,你会得到什么?

小部件是否订阅状态并返回一次性?它是否返回从状态派生的流?如果有,里面有什么?您是否尝试通过广泛使用 selectMany(identity) 将所有小部件/子小部件/子子小部件中的所有流收集在一起,以获得您订阅的最终应用程序流以启动整个过程?

如果小部件根据状态按需创建子小部件,小部件如何管理其子小部件?我一直在尝试使用 groupBy() 的解决方案,每个子小部件都有一个组,但是管理从嵌套的 observable 返回的所有订阅或流是一个令人难以置信的噩梦。

即使是整个应用程序的一个示例也会有所帮助。

0 投票
0 回答
505 浏览

javascript - rx 数据驱动子小部件

跟进如何构建 rxjs 代码,关于在使用 rx 时如何使用子小部件构建小部件,在子小部件由数据驱动的情况下,您将如何构建 rx 代码?

作为一个玩具问题,假设您有一个外部源(例如 Web 服务)流式传输要观看的股票列表,包括价值、高、低等。所以您有一个 Observable 流,例如(时间下降):

您需要为每个股票代码构建一个小部件。所以必须修改上一个问题中的模式。没有一个孩子,而是根据输入流创建/销毁一个孩子,例如,我们在第一个事件中为“csco”创建一个孩子。在第 2 次事件中,“csco”孩子获得更新(低:8),我们为“aapl”创建了一个孩子。

您可以将子小部件创建移动到订阅:

这引入了一个排序问题:孩子没有得到导致它被创建的事件。您可以通过执行类似state.repeat(1), 或的操作来解决此问题state.startsWith(s).select(...),但它看起来有点奇怪。

假设孩子们也在返回溪流。例如,也许用户操纵孩子们来模拟不同的位置,所以我们想从孩子们那里得到一些计算,并在小部件中显示一个总体总数或其他指标。

您是否在 widget() 中创建了一个主题并将来自孩子的流推送到它上面?喜欢

这一切看起来都很笨拙。有没有更好的组织方式?我尝试将其建模为一个流,每个孩子一个,并合并输出。这效果不佳。很难跟踪内部订阅,或者从子级输出多个流。

0 投票
1 回答
1686 浏览

javascript - RXJS 在 html5 画布上画线

我正在尝试使用 Reactive Extensions for Javascript (RX-JS) 来达到我在此处发布的相同效果。我对如何做到这一点有点困惑。这是页面:

我想我应该为 mouseDown、mouseMove 和 mouseUp 事件创建 observables。

但我不知道如何将它们结合起来。我认为应该开始观察 mousedown,然后收集所有移动,直到 mouseup 被提升,并同时重新绘制从起点到鼠标在 mousemove 期间鼠标所在的当前点的线。你有什么想法?非常感谢。

°°°°°°°°°°°°°°°°°°°°°°°编辑°°°°°°°°°°°°°°°°°°°°°°°°°°°° °°°°°°°°°°

这是布兰登回答后的代码:

0 投票
1 回答
1976 浏览

javascript - how to avoid glitches in Rx

Unlike other "FRP" libraries, Rx doesn't prevent glitches: callbacks invoked with time-mismatched data. Is there a good way to work around this?

As an example, imagine that we have a series of expensive computations derived from a single stream (e.g. instead of _.identity, below, we do a sort, or an ajax fetch). We do distinctUntilChanged to avoid recomputing the expensive things.

The second event will end up causing a number of glitchy states: we get three events out, instead of one, which wastes a bunch of cpu and requires us to explicitly work around the mismatched data.

This particular example can be worked around by dropping the distinctUntilChanged, and writing some wonky scan() functions to pass through the previous result if the input hasn't changed. Then you can zip the results, instead of using combineLatest. It's clumsy, but doable.

However if there is asynchrony anywhere, e.g. an ajax call, then zip doesn't work: the ajax call will complete either synchronously (if cached) or asynchronously, so you can't use zip.

Edit

Trying to clarify the desired behavior with a simpler example:

You have two streams, a and b. b depends on a. b is asynchronous, but the browser may cache it, so it can either update independently of a, or at the same time as a. So, a particular event in the browser can cause one of three things: a updates; b updates; both a and b update. The desired behavior is to have a callback (e.g. render method) invoked exactly once in all three cases.

zip does not work, because when a or b fires alone, we get no callback from zip. combineLatest does not work because when a and b fire together we get two callbacks.