问题标签 [backpressure]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
node.js - rxjs mongo 流背压
最近我遇到了来自 mongo-cursor-readable 的 rx.js 的背压问题。任务基本上是:
- 查询 DB A (Mongo),使用它的可读性并将其转换为可观察的
对输入流执行各种异步转换,例如
2.1 查询另一个 DB B(Mongo,等等)以获取一个项目(或一批项目)的附加数据,然后对其执行(可能是异步的)操作
2.2 为一个项目(或一批项目)查询另一个 DB B(Mongo,无论如何),然后使用该附加数据执行过滤
- 计算输入流并将其与输出流进行比较(因为可能会发生过滤)
虽然我确实有解决所有问题的方法,但我正在处理来自第一个 Mongo-DB 的背压,因为 2. 或 2.1/2.2 可能非常耗时,但初始查询的性能要高得多 ( a.k.a consumers are slow, producers are fast
)
目前,我唯一的解决方案是将 1. 限制在合理的数量以减少初始吞吐量,但这显然会导致整个转换链的吞吐量变慢(生产者需要长时间等待,直到整个链完成工作)
如何实现一个始终忙碌但又不受背压的转型链?
我在这里考虑根据其消费者的当前(或预计)性能(例如转换所花费的时间)动态降低生产者的速度,但这对我来说似乎不是很直观。
spring - Backpressure in a Spring DeferredResult + Akka actors application
I am thinking of using a chain of Akka workers to model a workflow inside a DeferredResult based Spring MVC web application. Essentially the controller will return a DeferredResult and the actors in the chain will work to populate a CompletableFuture which feeds the DeferredResult when completed.
What I am not able to figure out is:
* Will Akka exert back-pressure if this setup takes on too much load.
* If so, how can I detect that this is happening?
c# - gRPC 的 C# 实现是否具有流式背压?
我有一个 gRPC 服务,它接受来自客户端的流式消息。客户端以高速率向服务器发送有限序列消息。
结果是服务器缓冲了大量消息(> 1GB),并且它的内存使用量猛增,然后在处理它们时慢慢耗尽。
我发现即使我等待所有异步调用,客户端也会尽可能快地推送消息。我希望客户放慢速度。
我已经实现了客户端在发送下一条消息之前等待的显式 ack 响应,但是由于 http/2 已经内置了流控制语义,我觉得我有点重新发明轮子。
我有两个具体的问题。
C# 实现会自动应用背压吗?例如,如果消费端在异步流上调用 MoveNext 的速度很慢,那么客户端是否需要更长的时间才能从其对 WriteAsync 的调用中返回?
gRPC 的 C# 实现是否有任何可配置的方式来限制流式 rpc 调用的消息缓冲。例如,限制缓冲消息的数量或限制调用缓冲区中的空间量。
scala - 如何创建具有背压和控制的 Akka 流
我需要使用以下接口创建一个函数:
我的问题是我不知道如何以适合上述界面的方式定义流程。
当我做这样的事情时
结果类型为 Flow[Item, OtherItem, NotUsed]。到目前为止,我还没有在 Akka 文档中找到任何内容。此外,akka.stream.scaladsl.Flow 上的功能仅提供“未使用”而不是控制。如果有人能指出我正确的方向,那就太好了。
一些背景知识:我需要设置几个仅在转换部分进行区分的管道。这些管道是主流的子流,可能由于某种原因而停止(相应的消息到达某个 kafka 主题)。因此我需要控制部分。我们的想法是创建一个 Graph 模板,我只需在其中插入提到的流作为参数(返回它的工厂)。对于特定情况,我们有一个可行的解决方案。为了概括它,我需要这种流程。
java - Observable 切换到 Flowable 时消耗的资源是多少
我们正在将项目从 javaRx 1.x 切换到 javaRx 2.x。我真的无法理解:
- 为什么我应该在我的项目中保留 Observable 而不是到处使用 Flowable。
- 如果我知道没有任何背压的地方,我是否需要使用没有背压设置的 Flowable(默认为 ON_OVERFLOW_ERROR),它在内存、处理器时间使用方面与新的 javaRx 2.x Observable 有什么不同吗?
- 如果我保持 Observable,我会有更多潜在的错误吗?
这里我们有一些解释:RxJava 2.0 中的 Observable 和 Flowable 有什么区别?
如果我在任何地方都使用 Flowable,有什么区别?
concurrency - 如何使用谷歌云功能处理背压
使用谷歌云函数,有没有办法像 AWS Lambda 那样管理执行并发?(https://docs.aws.amazon.com/lambda/latest/dg/concurrent-executions.html)
我的意图是设计一个使用任务文件并将这些任务发布到工作队列(发布/订阅)的函数。我想要一个从工作队列(发布/订阅)中使用任务并执行任务的函数。
以上可能导致大量几乎并发的执行。我的下游消费者服务很慢,一次不能消费很多并发请求。很可能,它会返回 HTTP 429 响应以尝试减慢生产者的速度。
有没有办法像使用 AWS 一样限制给定 Google Cloud 函数的并发性?
rx-java2 - Rx Java 2 在单独的线程上预拉下一个项目
场景:我有一个从数据库中读取的数据流。我想做的是读取一大块数据,处理它并使用 rx-java 2 流式传输它。但是当我处理和流式传输它时,我想在单独的线程上加载下一个数据块(预拉下一个块)。
我试过了:
不幸的是,这会导致 generate 方法在 io 线程上持续运行。我只想要一个预拉。我尝试过使用缓冲区,但这实际上只是最终创建了块列表。
所以基本上当我在一个单独的线程上流式传输当前块时,我想读取下一个块并准备好它。
不确定这是否可能。我需要使用生成,因为没有数据何时结束的概念。
我尝试使用 subscribe(new FlowableSubscriber(){...}) 使用 Subscription::request 但这似乎不起作用。
rx-java2 - FlowableOperator 是否天生就支持背压?
我已经按照 RxJava2 wiki ( https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#operator-targeting-liftFlowableOperator
)中的描述实现了一个,除了我在操作中执行了一些测试像那样:onNext()
该运算符是我Flowable
创建的带有背压降的链条的一部分。本质上,它看起来几乎是这样的:
我遇到了以下问题:
- 出现背压,因此
doSomething(n)
无法处理即将到来的上游 - 由于选择了背压策略,项目被丢弃
- 但是 doSomething(n) 在执行 drop 后永远不会收到新项目,而 doSomething(n) 已准备好处理新项目
回顾 David Karnok的优秀博文http://akarnokd.blogspot.fr/2015/05/pitfalls-of-operator-implementations.html,看来我需要request(1)
在onNext()
方法中添加一个。但那是用 RxJava1 ......
所以,我的问题是:这个修复在 RxJava2 中是否足以解决我的背压问题?或者我的操作员是否必须实现所有关于原子的东西,耗尽https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#atomics-serialization-deferred-actions中描述的东西来正确处理我的背压问题?
注意:我添加了request(1)
它,它似乎工作。但我不知道这是否足够,或者我的操作员是否需要队列排水和原子等棘手的东西。
提前致谢!
apache-kafka - 如何降低 Kafka Spout 的摄取率并启用背压?
我正在使用storm-kafka-client 1.1.1 和storm-core 1.1.0。
我已经调整了以下参数,但无法启用背压并降低 kafka-spout 的摄取率。
Spout每秒消耗2000条消息。
下游 Bolt 处理一条消息需要50 毫秒,即每秒处理 20 条消息。
spout 发出的元组和bolt 执行的元组之间的延迟随着时间的推移而增加。
**如何让 Spout 每秒读取 20 条消息并保持其消耗率与 Bolt 的执行率相同**
我不确定应该为TOPOLOGY_SPOUT_WAIT_STRATEGY和BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK设置什么值
那么上面的参数和值的什么组合可以帮助控制 spout 摄取率?
任何建议将不胜感激。
谢谢卡尼斯卡
c# - RX 术语:当有频繁的可观察通知时,RX 运算符中的异步处理
目的是在 RX 运算符中对稀缺资源进行一些异步工作,例如 Select。当可观察通知的发送速度快于异步操作完成所需的时间时,就会出现问题。
现在我实际上解决了这个问题。我的问题是这种特殊问题的正确术语是什么?它有名字吗?是背压吗?到目前为止,我所做的研究表明,这是某种压力问题,但根据我的理解,不一定是背压。我找到的最相关的资源是: https://github.com/ReactiveX/RxJava/wiki/Backpressure-(2.0) http://reactivex.io/documentation/operators/backpressure.html
现在到实际代码。假设有一种稀缺资源,它是消费者。在这种情况下,资源在使用时会引发异常。请注意,不应更改此代码。
现在这是使用资源的幼稚实现的问题。抛出错误是因为通知的速度比消费者运行的速度更快。
使用下面的代码,问题就解决了。我通过将已完成的 BehaviorSubject 与 Zip 运算符结合使用来减慢处理速度。本质上,这段代码所做的是采用顺序方法而不是并行方法。
问题 这是背压吗?如果不是,是否还有其他相关术语?