问题标签 [backpressure]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
397 浏览

scala - Akka Sharding 与持久演员的背压

是否有 Akka 流背压模型或用于将 akka 分片与持久演员一起使用的东西?

因为我有一个问题,我有一个带有持久演员的 akka 分片集群。(使用 cassandra 作为日志插件)。
有时应该同时创建很多演员。(例如,当我们想向所有用户发送广播消息时。)
并且成千上万的持久性参与者试图在短时间内(例如 3 秒)恢复,并且在负载很重的情况下,cassandra 无法一次响应并且很多参与者失败多次恢复,再也没有恢复。

0 投票
2 回答
4044 浏览

apache-spark - Spark Structured Streaming 如何处理背压?

我正在分析 Spark Structured Streaming 的背压特性。有谁知道细节?是否可以通过代码调整进程传入记录?谢谢

0 投票
0 回答
351 浏览

c# - 背压 .net 反应式 - 快速可观察

我正在使用 .NET 响应式构建生产者..消费者模式。生产者从 Kafka 消息总线读取消息。一旦消息被读取,就需要交给消费者来处理消息。

我能够使用 .NET 反应式扩展(可观察和观察)来做到这一点。但是,我想处理从总线读取消息速度更快而消费者落后的情况。我希望 observable 对背压做出反应,即如果 observable 表明它仍在处理较早的消息,请减慢 observable。

0 投票
3 回答
7946 浏览

rx-java - Rxjava 中的“背压”一词是什么意思?

我是 RxJava 的初学者,我很好奇“背压”的含义。

这是否意味着生产者背着消费者施加压力?

或者这是否意味着消费者正在向生产者施加压力?(相反方向的压力)

0 投票
5 回答
3879 浏览

javascript - RxJs:zip 运算符的有损形式

考虑使用zip运算符将两个无限 Observable 压缩在一起,其中一个发出项目的频率是另一个的两倍。
当前的实现是无损的,即如果我让这些 Observable 发射一个小时,然后在它们的发射速率之间切换,第一个 Observable 最终将赶上另一个。
随着缓冲区越来越大,这将在某些时候导致内存爆炸。
如果第一个 observable 将发射几个小时的项目,而第二个将在最后发射一个项目,也会发生同样的情况。

我如何实现此操作员的有损行为?我只想在我从两个流中获得排放的任何时候排放,我不在乎我错过了更快的流中有多少排放。

说明:

  • 我在这里要解决的主要问题是由于zip运算符的无损特性导致的内存爆炸。
  • 即使两个流每次都发出相同的值,我也想随时从两个流中获得排放

例子:

常规zip将产生以下输出:

我希望它产生的输出:

说明:
有损zip算子的zip缓冲区大小为1。这意味着它只会保留最先发出的流中的第一项,并且会丢失所有其余的项(到达第一项和第二个流的第一次发射之间的项)。因此,示例中发生的情况如下:stream1emits 1,有损 zip “记住”它并忽略所有项目,stream1直到stream2发出。第一次发射stream2就是10这么stream12。在相互发射(有损的第一个发射zip)之后,它重新开始: "remember" 3, "loose" 4, emit [3,20]。然后重新开始: "remember" 5、 "loose"67, emit [5,30]。然后重新开始:40, "松散" 50, 60,70并等待下一个项目stream1

示例 2:

在这种情况下,普通zip操作员会爆炸内存。
我不希望它。

总结:
本质上,我希望有损zip运算符只记住stream 1 先前相互发射之后发出的第一个值,并在stream 2赶上stream 1. 并重复。

0 投票
0 回答
1105 浏览

akka - 如何做 akka-http 请求级背压?

在 akka-http 中,您可以:

  1. 设置 akka.http.server.max-connections,这将阻止超过该数量的连接。超过此限制意味着客户端将获得连接超时。
  2. 设置 akka.http.server.pipelining-limit,它可以防止给定的连接一次有超过这个数量的未完成请求。超过这意味着客户端将获得套接字超时。

这些都是从 http 服务器到客户端的背压形式,但两者都非常低级,并且仅与服务器的性能间接相关。

根据服务器看到的请求率,似乎更好的是在 http 级别进行背压。可能通过返回 429 - Too Many Requests。请求率也可以说是性能的间接衡量标准,但它似乎比连接数更接近。

这似乎是一件相当合理的事情,但我很难找到任何现有的模式。这是我能找到的最接近的参考:https ://github.com/akka/akka-http/issues/411

据我所知,最好的方法是抓住Flow你把你Route变成的东西,并将其插入到一个图表中,该图表具有全局请求率度量(或者可能是单个处理队列)和绕过Route(完全返回 429 或其他)。

有更好的想法吗?

0 投票
0 回答
246 浏览

scala - 如何控制akka流背压缓慢的下游api

我有一个 API 调用另一个下游 API。下游 api 有其局限性,它的吞吐量每秒只能处理 75 个请求。

我想使用 akka 流来控制 API 每秒不超过 75 个下游 api 请求。

这是代码片段。

当我通过 Gatling 每秒发送 100 个请求时。由于下游 api 无法处理负载,30% 的请求发送失败。有没有办法应用背压进行配置,例如如果无法处理,则避免进行下游 api 调用。

0 投票
0 回答
852 浏览

scala - 使用 akka-http 流的选择性请求限制

在此处输入图像描述

我有一个 API 调用另外两个下游 API。一个下游 api ( https://test/foo) 非常重要,而且速度非常快。另一个慢速下游 api( https://test/bar) 有其局限性,它的吞吐量每秒只能处理 50 个请求。

我想确保下游 apihttps://test/foo的优先级高于https://test/bar. 比如API线程池为75,我只允许50个并行传入连接通过https://test/bar。其余连接应用于https://test/bar. 它将https://test/bar永远不会失败。

我想我应该用 OverflowStrategy.dropNew 应用油门或缓冲https://test/bar

这是代码片段。

问题 1:我应该在哪里throttle(50, 1 seconds, 5000, ThrottleMode.Shaping)只符合https://test/bar阈值。

问题 2:如果我想优先处理请求,是否需要应用bufferOverflowStrategy.dropNew 。https://test/foo换句话说,https://test/bar应该删除所有不必要的连接。

问题3:有没有更好的方法来实现这个要求。我connection.handleWith[Flow[HttpRequest, HttpResponse]]在 Sink 中使用,我不确定这是正确的地方。

如果提供了一些代码片段,那将非常感激并且超级棒:)

提前致谢

0 投票
1 回答
941 浏览

java - 使用 RxJava2 创建一个带有 generate 函数的 flowable

我需要创建一个自定义 Flowable 并实现背压。我正在尝试实现某种分页。这意味着当下游请求 5 个项目时,我将“向数据源询问”项目 0 - 5。然后当下游需要另外 5 个项目时,我将获取项目 5 - 10 并发回。

到目前为止我发现的最好的事情是使用Flowable.generate方法,但我真的不明白为什么没有办法(据我所知)如何获取requested下游请求的项目数量。我可以使用state生成器的属性来保存最后请求的项目的索引,所以我只需要新请求的项目的数量。我在 BiFunctionapply中得到的发射器实例GeneratorSubscription是从AtomicLong. 所以投射 emmiterAtomicLong可以得到我请求的号码。但我知道这不是“推荐”的方式。

另一方面,当您使用时,Flowable.create您会得到具有long requested()方法的 FlowableEmitter。usinggenerate更适合我的用例,但现在我也很好奇什么是“正确”的使用方式Flowable.generate

也许我想太多了,所以请指出我正确的方向。:) 谢谢。

这是实际代码的样子(在 Kotlin 中):

0 投票
1 回答
661 浏览

c - 如何在 cassandra 中运行 EC2 告密者配置

我正在尝试在 Ec2 snitch 配置中完成单个 cassandra 节点的启动。

我为此配置更改的 yaml 属性是:

所有其他的 thrift、rpc_broadcast、broadcast 都被注释掉并且没有被启用。

尝试启动连接时出现此错误

INFO [main] 2017-12-04 16:54:15,021 RateBasedBackPressure.java:123 - 高比率的初始化背压:0.9,因子:5,流量:FAST,窗口大小:2000。INFO [main] 2017-12 -04 16:54:15,022 DatabaseDescriptor.java:725 - 使用策略 org.apache.cassandra.net.RateBasedBackPressure {high_ratio=0.9, factor=5, flow=FAST} 禁用背压。启动期间遇到异常(org.apache.cassandra.exceptions.ConfigurationException):实例化告密类“org.apache.cassandra.locator.Ec2Snitch”时出错。org.apache.cassandra.exceptions.ConfigurationException:实例化告密类“org.apache.cassandra.locator.Ec2Snitch”时出错。在 org.apache.cassandra.utils.FBUtilities.construct(FBUtilities.java:543) 在 org.apache.cassandra.utils.FBUtilities.construct(FBUtilities.java: newInstance(NativeConstructorAccessorImpl.java:62) 在 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 在 java.lang.reflect.Constructor.newInstance(Constructor.java:423) 在 java.lang.Class.newInstance(Class .java:442) at org.apache.cassandra.utils.FBUtilities.construct(FBUtilities.java:528) ... 还有 8 个错误 [main] 2017-12-04 16:54:18,308 CassandraDaemon.java:706 - 异常在启动 org.apache.cassandra.exceptions.ConfigurationException 期间遇到:实例化 snitch 类 'org.apache.cassandra.locator.Ec2Snitch' 时出错。在 org.apache.cassandra.utils.FBUtilities.construct(FBUtilities.java:543) ~[apache-cassandra-3.11.1.jar:3.11.1] 在 org.apache.cassandra.utils.FBUtilities.construct(FBUtilities. java:521) ~[apache-cassandra-3.11.1.jar:3.11.

我无法找出错误的最初原因。我已经检查了 rack-dc 和 topology-properties 文件中的任何属性的初始化。但我没有发现任何值得引起这个问题的东西。该节点甚至没有初始化初始模式。谁能帮我解决这个问题?如果可能,请帮助我在 Ec2 snitch 配置中设置多节点设置。我已经尝试了所有可能的方法来建立具有 Ec2 配置的单节点。所以,我想把它贴在 SO 中,让大智慧帮助我解决这个问题。

提前致谢。