问题标签 [backpressure]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
2159 浏览

javascript - RxJS 先采取然后节流并等待

我想mousewheel使用 RxJS-DOM 观察事件,以便当第一个事件触发时,我将其转发,然后删除任何和所有值,直到后续值之间的延迟超过先前指定的持续时间。

我想象的操作员可能看起来像:

想象以下数据流:

其中发送的值是数字,时间描述了下一个值到达所需的时间。由于 0 是第一个值,它将被发出,然后直到 3 的所有值都将被丢弃,因为后续值之间的各个延迟不大于500ms.

与节流阀不同,无论是否发出最后接收的值,都会计算值之间的时间延迟。使用节流阀,将发送 0,经过 200 毫秒,评估 1 并失败,经过 400 毫秒,评估 2 并通过,因为上次发出的值 (0) 和当前接收到的值 (2 ) 是 600 毫秒,而对于我的操作员,它将相对于 1 进行评估,并且经过的时间将为 400 毫秒,因此未通过测试。

而且这个算子也没有去抖动。它不是等到间隔过去才发出,而是首先发送第一个值,然后根据所有未来值进行评估,依此类推。

这样的运营商是否已经存在?如果没有,我将如何制作一个?

0 投票
1 回答
294 浏览

android - 加速度计 RxJava 背压

背压有问题。使用发布主题在发出时获取传感器事件,并且需要在事务中订阅主题时将数据保存到数据库。

我一直在尝试使用 .window(100) 运算符,因此每当我连续获得 100 个传感器事件时,我都可以批量插入,但我只能在 .subscribe() 获得一项

不想通过使用缓冲区运算符来删除事件。处理这个问题的正确方法是什么?

0 投票
1 回答
633 浏览

spring - 用于数据库请求的 Java8 Stream 或 Reactive / Observer

我正在重新考虑我们的 Spring MVC 应用程序行为,无论是从数据库中提取(Java8 Stream)数据还是让数据库推送(Reactive / Observable)数据并使用背压来控制数量更好。


现在的情况:

  1. User请求最近的 30 篇文章
  2. Service进行数据库查询并将 30 个结果放入List
  3. Jackson迭代List并生成 JSON 响应

为什么要切换实现?

这非常消耗内存,因为我们一直将这 30 个对象保存在内存中。这不是必需的,因为应用程序一次处理一个对象。尽管应用程序应该能够检索一个对象、对其进行处理、将其丢弃并获取下一个对象


Java8 流?(拉)

java.util.Stream很容易:Service创建一个Stream,它在幕后使用数据库游标。并且每次Jackson为 的一个元素写入 JSON 字符串时Stream,它会请求下一个元素,然后触发数据库游标返回下一个条目。


RxJava / 反应式 / 可观察?(推)

这里我们有相反的情况:数据库必须逐个推送条目,并且Jackson必须为每个元素创建 JSON 字符串,直到onComplete调用该方法。

Controller告诉Service给我一个Observable<Article>。然后Jackson可以请求尽可能多的数据库条目,因为它可以处理。


分歧和担忧:

请求下一个数据库条目和检索/处理它Streams之间总是有一些延迟。如果网络连接速度较慢或必须发出大量数据库请求来完成响应,这可能会减慢 JSON 响应时间。

使用RxJava应该始终有可用于处理的数据。如果它太多,我们可以使用背压来减慢从数据库到应用程序的数据传输。在最坏的情况下,缓冲区/队列将包含所有请求的数据库条目。那么内存消耗将等于我们当前使用List.


我为什么要问/我要什么?

  • 我错过了什么?还有其他优点/缺点吗?

  • Stream如果每个数据库请求/响应之间总是存在(短)延迟,为什么(特别是)Spring Data Team 扩展他们的 API 以支持来自数据库的响应?对于大量请求的条目,这可能会导致一些明显的延迟。

  • 是否建议在RxJava这种情况下使用(或其他一些反应式实现)?还是我错过了任何缺点?

0 投票
1 回答
345 浏览

asynchronous - mapcat 破坏 core.async 中的背压时,内存泄漏在哪里?

我在 Clojure 中编写了一些 core.async 代码,当我运行它时,它消耗了所有可用内存并因错误而失败。似乎mapcat在 core.async 管道中使用会破坏背压。(由于超出此问题范围的原因,这是不幸的。)

下面是一些代码,通过计算ing 转换器:x的进出数来演示该问题:mapcat

生产者远远领先于消费者。

看来我不是第一个发现这一点的人。但是这里给出的解释似乎并没有完全涵盖它。(尽管它确实提供了一个足够的解决方法。)从概念上讲,我希望生产者领先,但只是通道中可能缓冲的少数消息的长度。

我的问题是,所有其他消息在哪里?到第四行输出 7000 :xs 下落不明。

0 投票
1 回答
336 浏览

reactive-programming - 如何设计具有扇出能力的反应式流演员系统

我正在尝试实现一个具有背压功能的基于参与者的系统。作为要求,主进程接收 JSON 格式的流数据。然而,每个 JSON 事件都有几个字段,例如 {ip: '123.43.12.1', country: 'US', ... etc}。JSON的结构是事先知道的。

现在,我必须以某种方式将 JSON 结构扁平化为 (key, value) 对。例如,上面的数据可以被展平成(ip, freq), (country, freq),其中freq是一个ip(比如'123.43.12.1')在数据流中出现的次数。

一种非常自然的方法是将每个(键,值)对转发给相应的子/远程参与者以进行进一步评估。例如 ('123.43.12.1', 1) 发送给 IP-Actor;('US', 1) 被发送到 Country-Actor 等等。

我想确保整个系统都处于背压状态。在这种情况下事情会更加复杂,因为事件 {ip: '123.43.12.1', country: 'US'} 只有在 IP-Actor 和 Country-Actor 都完成了扁平化对 ('123.43. 12.1', 1), ('美国', 1)。每个参与者可能有不同的处理速度(例如,IP-Actor 比 Country-Actor 快得多)。在这种情况下,我希望接收流的主进程将等待/阻塞,直到有需求信号(当两个参与者完成处理其邮箱中的现有数据时发生)。否则,某些参与者可能会在邮箱中充满消息(国家参与者 - 慢一个),但由于其他参与者邮箱是空的(IP-参与者 - 较快的一个),消息仍然会进来。

任何人都可以建议反应流规范是否提供这样的功能。如果没有,无论如何以最有效的方式实现功能。

谢谢。

0 投票
2 回答
373 浏览

rxjs5 - 如何在 RxJS 5 (beta) 中跳过过于频繁的鼠标事件?

我在一个项目中使用 RxJS 5 (beta10) 。我的大部分事件都来自鼠标或触摸拖动,我只需要听到最后一个。

即我如何在 RxJS5 中做一个有损流?

在某些平板电脑上运行演示清楚地显示了当图形更新花费的时间超过浏览器的事件触发时发生的“拖拽”(这不是连续的,甚至它本身 - 浏览器似乎在他们实际告诉事情发生变化的时间间隔上进行了优化)。

通过有损数据流,我可以让用户体验始终跟上手指的步伐。


编辑:

这个答案似乎很好地总结了 RxJS5 背压的当前状态。

0 投票
1 回答
685 浏览

javascript - 如何一次处理 RxJS 流 n 个项目,一旦项目完成,再次自动填充回 n?

我有一个事件流,我想调用一个函数来为每个事件返回一个承诺,问题是这个函数非常昂贵,所以我想一次最多处理 n 个事件。

这个卵石图可能是错误的,但这是我想要的:

这是我到目前为止的代码:

这段代码有两个问题:

  1. 它使用了inProgressCount用副作用函数更新的 var。
  2. 当我从受控流中请求超过 1 个项目时,仅调用一次 done$ 订阅。这使得inProgressCountvar 更新不正确,这最终将队列限制为一次一个。

你可以在这里看到它工作:http: //jsbin.com/wivehonifi/1/edit ?js,console,output

问题:

  1. 有更好的方法吗?
  2. 我怎样才能摆脱inProgressCount变量?
  3. 为什么在请求多个项目时只调用一次 done$ 订阅?

更新:
对问题 #3 的回答:switchMap 与 flatMapLatest 相同,这就是为什么我只得到最后一个。将代码更新为 flatMap 而不是 switchMap。

0 投票
1 回答
394 浏览

java - 在 RxJava 中处理背压而不丢弃项目或序列化

简而言之,是否有任何解决方案可以解决 RxJava 中的背压,而无需借助丢弃项目、序列化操作或无界缓冲?

考虑以下任务作为何时有用的示例。

  1. 将数据从磁盘读入内存
  2. 压缩数据
  3. 通过网络传输压缩数据

直接的方法是在单个后台线程上按顺序执行所有任务,如下所示:

虽然这没有问题,但从性能的角度来看,它是次优的,因为运行时是所有三个操作的总和,而不是并行运行时它们中的最大值:

但是,如果从磁盘读取数据的速度快于压缩和传输的速度,这可能会由于背压而失败。通常的背压解决方案是不可取的,原因如下:

  1. 丢件:文件必须完整传输,不得遗漏
  2. 在单线程上序列化:流水线的性能提升丢失了
  3. 调用栈阻塞:在 RxJava 中不支持
  4. 增加observeOn buffers:内存消耗可能会变成文件大小的几倍
  5. 在没有 MissingBackpressureException 的情况下重新实现 observeOn:大量工作并破坏了流畅的 API

还有其他解决方案吗?还是这根本不适合 ReactiveX 可观察模型?

0 投票
1 回答
181 浏览

javascript - 将 RxJS v4 代码转换为 v5,使用“拉”处理队列

我有一个我想观察/订阅的“工作队列”。这是要处理的命令对象数组。新的工作项目通常会以突发的方式到达,并且需要按顺序进行处理(按收到的顺序,一次一个,直到完全处理)。

我正在使用 RxJS 5.0.0-beta.6。(其他图书馆强加的版本)

这是一个工作示例,说明了我想要的行为,但使用了 RxJS v4。

有问题的主要代码是这个......

http://jsbin.com/meyife/edit?js,输出

鉴于 API 的当前 beta 状态和不完整/更改的文档,我无法弄清楚如何在 RxJS 5 中执行此操作。

更新:这个从 v4 迁移到 v5 的迁移指南显示了许多被删除的功能,但没有指导如何以新的方式做事。已移除操作的示例:.tap、.controlled、.flatMapWithMaxConcurrent(重命名)。

0 投票
1 回答
410 浏览

java - 背压在 RxJava 内部是如何发生的

我一直在阅读一些关于 RxJava 中背压的文档,但我找不到详细的解释,比如它是如何在库内部发生的,每个人都只是总结它,比如“生产者”太快而“消费者”太慢。

例如像下面的代码:

我一直在浏览 RxJava 源代码,所以我的理解是在主线程中,我们将每毫秒发出事件,一旦我们发出它,我们将值传递给 System.out.println(i) 方法并将其扔到newThead 调度程序的线程池并在可运行对象中运行该方法。

所以我的问题是,异常是如何在内部发生的?因为我们在调用Thread.sleep()的时候,只是休眠了处理方法调用的线程-> System.out.println(),没有影响线程池中的其他线程,怎么会导致异常。是不是因为线程池没有足够的可用线程了?

谢谢