问题标签 [backpressure]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
javascript - RxJS 先采取然后节流并等待
我想mousewheel
使用 RxJS-DOM 观察事件,以便当第一个事件触发时,我将其转发,然后删除任何和所有值,直到后续值之间的延迟超过先前指定的持续时间。
我想象的操作员可能看起来像:
想象以下数据流:
其中发送的值是数字,时间描述了下一个值到达所需的时间。由于 0 是第一个值,它将被发出,然后直到 3 的所有值都将被丢弃,因为后续值之间的各个延迟不大于500ms
.
与节流阀不同,无论是否发出最后接收的值,都会计算值之间的时间延迟。使用节流阀,将发送 0,经过 200 毫秒,评估 1 并失败,经过 400 毫秒,评估 2 并通过,因为上次发出的值 (0) 和当前接收到的值 (2 ) 是 600 毫秒,而对于我的操作员,它将相对于 1 进行评估,并且经过的时间将为 400 毫秒,因此未通过测试。
而且这个算子也没有去抖动。它不是等到间隔过去才发出,而是首先发送第一个值,然后根据所有未来值进行评估,依此类推。
这样的运营商是否已经存在?如果没有,我将如何制作一个?
android - 加速度计 RxJava 背压
背压有问题。使用发布主题在发出时获取传感器事件,并且需要在事务中订阅主题时将数据保存到数据库。
我一直在尝试使用 .window(100) 运算符,因此每当我连续获得 100 个传感器事件时,我都可以批量插入,但我只能在 .subscribe() 获得一项
不想通过使用缓冲区运算符来删除事件。处理这个问题的正确方法是什么?
spring - 用于数据库请求的 Java8 Stream 或 Reactive / Observer
我正在重新考虑我们的 Spring MVC 应用程序行为,无论是从数据库中提取(Java8 Stream)数据还是让数据库推送(Reactive / Observable)数据并使用背压来控制数量更好。
现在的情况:
User
请求最近的 30 篇文章Service
进行数据库查询并将 30 个结果放入List
Jackson
迭代List
并生成 JSON 响应
为什么要切换实现?
这非常消耗内存,因为我们一直将这 30 个对象保存在内存中。这不是必需的,因为应用程序一次处理一个对象。尽管应用程序应该能够检索一个对象、对其进行处理、将其丢弃并获取下一个对象。
Java8 流?(拉)
这java.util.Stream
很容易:Service
创建一个Stream
,它在幕后使用数据库游标。并且每次Jackson
为 的一个元素写入 JSON 字符串时Stream
,它会请求下一个元素,然后触发数据库游标返回下一个条目。
RxJava / 反应式 / 可观察?(推)
这里我们有相反的情况:数据库必须逐个推送条目,并且Jackson
必须为每个元素创建 JSON 字符串,直到onComplete
调用该方法。
即Controller
告诉Service
:给我一个Observable<Article>
。然后Jackson
可以请求尽可能多的数据库条目,因为它可以处理。
分歧和担忧:
在请求下一个数据库条目和检索/处理它Streams
之间总是有一些延迟。如果网络连接速度较慢或必须发出大量数据库请求来完成响应,这可能会减慢 JSON 响应时间。
使用RxJava
应该始终有可用于处理的数据。如果它太多,我们可以使用背压来减慢从数据库到应用程序的数据传输。在最坏的情况下,缓冲区/队列将包含所有请求的数据库条目。那么内存消耗将等于我们当前使用List
.
我为什么要问/我要什么?
我错过了什么?还有其他优点/缺点吗?
Stream
如果每个数据库请求/响应之间总是存在(短)延迟,为什么(特别是)Spring Data Team 扩展他们的 API 以支持来自数据库的响应?对于大量请求的条目,这可能会导致一些明显的延迟。是否建议在
RxJava
这种情况下使用(或其他一些反应式实现)?还是我错过了任何缺点?
asynchronous - mapcat 破坏 core.async 中的背压时,内存泄漏在哪里?
我在 Clojure 中编写了一些 core.async 代码,当我运行它时,它消耗了所有可用内存并因错误而失败。似乎mapcat
在 core.async 管道中使用会破坏背压。(由于超出此问题范围的原因,这是不幸的。)
下面是一些代码,通过计算ing 转换器:x
的进出数来演示该问题:mapcat
生产者远远领先于消费者。
看来我不是第一个发现这一点的人。但是这里给出的解释似乎并没有完全涵盖它。(尽管它确实提供了一个足够的解决方法。)从概念上讲,我希望生产者领先,但只是通道中可能缓冲的少数消息的长度。
我的问题是,所有其他消息在哪里?到第四行输出 7000 :x
s 下落不明。
reactive-programming - 如何设计具有扇出能力的反应式流演员系统
我正在尝试实现一个具有背压功能的基于参与者的系统。作为要求,主进程接收 JSON 格式的流数据。然而,每个 JSON 事件都有几个字段,例如 {ip: '123.43.12.1', country: 'US', ... etc}。JSON的结构是事先知道的。
现在,我必须以某种方式将 JSON 结构扁平化为 (key, value) 对。例如,上面的数据可以被展平成(ip, freq), (country, freq),其中freq是一个ip(比如'123.43.12.1')在数据流中出现的次数。
一种非常自然的方法是将每个(键,值)对转发给相应的子/远程参与者以进行进一步评估。例如 ('123.43.12.1', 1) 发送给 IP-Actor;('US', 1) 被发送到 Country-Actor 等等。
我想确保整个系统都处于背压状态。在这种情况下事情会更加复杂,因为事件 {ip: '123.43.12.1', country: 'US'} 只有在 IP-Actor 和 Country-Actor 都完成了扁平化对 ('123.43. 12.1', 1), ('美国', 1)。每个参与者可能有不同的处理速度(例如,IP-Actor 比 Country-Actor 快得多)。在这种情况下,我希望接收流的主进程将等待/阻塞,直到有需求信号(当两个参与者完成处理其邮箱中的现有数据时发生)。否则,某些参与者可能会在邮箱中充满消息(国家参与者 - 慢一个),但由于其他参与者邮箱是空的(IP-参与者 - 较快的一个),消息仍然会进来。
任何人都可以建议反应流规范是否提供这样的功能。如果没有,无论如何以最有效的方式实现功能。
谢谢。
javascript - 如何一次处理 RxJS 流 n 个项目,一旦项目完成,再次自动填充回 n?
我有一个事件流,我想调用一个函数来为每个事件返回一个承诺,问题是这个函数非常昂贵,所以我想一次最多处理 n 个事件。
这个卵石图可能是错误的,但这是我想要的:
这是我到目前为止的代码:
这段代码有两个问题:
- 它使用了
inProgressCount
用副作用函数更新的 var。 当我从受控流中请求超过 1 个项目时,仅调用一次 done$ 订阅。这使得inProgressCount
var 更新不正确,这最终将队列限制为一次一个。
你可以在这里看到它工作:http: //jsbin.com/wivehonifi/1/edit ?js,console,output
问题:
- 有更好的方法吗?
- 我怎样才能摆脱
inProgressCount
变量? 为什么在请求多个项目时只调用一次 done$ 订阅?
更新:
对问题 #3 的回答:switchMap 与 flatMapLatest 相同,这就是为什么我只得到最后一个。将代码更新为 flatMap 而不是 switchMap。
java - 在 RxJava 中处理背压而不丢弃项目或序列化
简而言之,是否有任何解决方案可以解决 RxJava 中的背压,而无需借助丢弃项目、序列化操作或无界缓冲?
考虑以下任务作为何时有用的示例。
- 将数据从磁盘读入内存
- 压缩数据
- 通过网络传输压缩数据
直接的方法是在单个后台线程上按顺序执行所有任务,如下所示:
虽然这没有问题,但从性能的角度来看,它是次优的,因为运行时是所有三个操作的总和,而不是并行运行时它们中的最大值:
但是,如果从磁盘读取数据的速度快于压缩和传输的速度,这可能会由于背压而失败。通常的背压解决方案是不可取的,原因如下:
- 丢件:文件必须完整传输,不得遗漏
- 在单线程上序列化:流水线的性能提升丢失了
- 调用栈阻塞:在 RxJava 中不支持
- 增加observeOn buffers:内存消耗可能会变成文件大小的几倍
- 在没有 MissingBackpressureException 的情况下重新实现 observeOn:大量工作并破坏了流畅的 API
还有其他解决方案吗?还是这根本不适合 ReactiveX 可观察模型?
javascript - 将 RxJS v4 代码转换为 v5,使用“拉”处理队列
我有一个我想观察/订阅的“工作队列”。这是要处理的命令对象数组。新的工作项目通常会以突发的方式到达,并且需要按顺序进行处理(按收到的顺序,一次一个,直到完全处理)。
我正在使用 RxJS 5.0.0-beta.6。(其他图书馆强加的版本)
这是一个工作示例,说明了我想要的行为,但使用了 RxJS v4。
有问题的主要代码是这个......
http://jsbin.com/meyife/edit?js,输出
鉴于 API 的当前 beta 状态和不完整/更改的文档,我无法弄清楚如何在 RxJS 5 中执行此操作。
更新:这个从 v4 迁移到 v5 的迁移指南显示了许多被删除的功能,但没有指导如何以新的方式做事。已移除操作的示例:.tap、.controlled、.flatMapWithMaxConcurrent(重命名)。
java - 背压在 RxJava 内部是如何发生的
我一直在阅读一些关于 RxJava 中背压的文档,但我找不到详细的解释,比如它是如何在库内部发生的,每个人都只是总结它,比如“生产者”太快而“消费者”太慢。
例如像下面的代码:
我一直在浏览 RxJava 源代码,所以我的理解是在主线程中,我们将每毫秒发出事件,一旦我们发出它,我们将值传递给 System.out.println(i) 方法并将其扔到newThead 调度程序的线程池并在可运行对象中运行该方法。
所以我的问题是,异常是如何在内部发生的?因为我们在调用Thread.sleep()的时候,只是休眠了处理方法调用的线程-> System.out.println(),没有影响线程池中的其他线程,怎么会导致异常。是不是因为线程池没有足够的可用线程了?
谢谢