问题标签 [backpressure]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Akka Sharding 与持久演员的背压
是否有 Akka 流背压模型或用于将 akka 分片与持久演员一起使用的东西?
因为我有一个问题,我有一个带有持久演员的 akka 分片集群。(使用 cassandra 作为日志插件)。
有时应该同时创建很多演员。(例如,当我们想向所有用户发送广播消息时。)
并且成千上万的持久性参与者试图在短时间内(例如 3 秒)恢复,并且在负载很重的情况下,cassandra 无法一次响应并且很多参与者失败多次恢复,再也没有恢复。
apache-spark - Spark Structured Streaming 如何处理背压?
我正在分析 Spark Structured Streaming 的背压特性。有谁知道细节?是否可以通过代码调整进程传入记录?谢谢
c# - 背压 .net 反应式 - 快速可观察
我正在使用 .NET 响应式构建生产者..消费者模式。生产者从 Kafka 消息总线读取消息。一旦消息被读取,就需要交给消费者来处理消息。
我能够使用 .NET 反应式扩展(可观察和观察)来做到这一点。但是,我想处理从总线读取消息速度更快而消费者落后的情况。我希望 observable 对背压做出反应,即如果 observable 表明它仍在处理较早的消息,请减慢 observable。
rx-java - Rxjava 中的“背压”一词是什么意思?
我是 RxJava 的初学者,我很好奇“背压”的含义。
这是否意味着生产者背着消费者施加压力?
或者这是否意味着消费者正在向生产者施加压力?(相反方向的压力)
javascript - RxJs:zip 运算符的有损形式
考虑使用zip运算符将两个无限 Observable 压缩在一起,其中一个发出项目的频率是另一个的两倍。
当前的实现是无损的,即如果我让这些 Observable 发射一个小时,然后在它们的发射速率之间切换,第一个 Observable 最终将赶上另一个。
随着缓冲区越来越大,这将在某些时候导致内存爆炸。
如果第一个 observable 将发射几个小时的项目,而第二个将在最后发射一个项目,也会发生同样的情况。
我如何实现此操作员的有损行为?我只想在我从两个流中获得排放的任何时候排放,我不在乎我错过了更快的流中有多少排放。
说明:
- 我在这里要解决的主要问题是由于
zip
运算符的无损特性导致的内存爆炸。 - 即使两个流每次都发出相同的值,我也想随时从两个流中获得排放
例子:
常规zip
将产生以下输出:
我希望它产生的输出:
说明:
有损zip
算子的zip
缓冲区大小为1
。这意味着它只会保留最先发出的流中的第一项,并且会丢失所有其余的项(到达第一项和第二个流的第一次发射之间的项)。因此,示例中发生的情况如下:stream1
emits 1
,有损 zip “记住”它并忽略所有项目,stream1
直到stream2
发出。第一次发射stream2
就是10
这么stream1
松2
。在相互发射(有损的第一个发射zip
)之后,它重新开始: "remember" 3
, "loose" 4
, emit [3,20]
。然后重新开始: "remember" 5
、 "loose"6
和7
, emit [5,30]
。然后重新开始:40
, "松散" 50
, 60
,70
并等待下一个项目stream1
。
示例 2:
在这种情况下,普通zip
操作员会爆炸内存。
我不希望它。
总结:
本质上,我希望有损zip
运算符只记住stream 1
先前相互发射之后发出的第一个值,并在stream 2
赶上stream 1
. 并重复。
akka - 如何做 akka-http 请求级背压?
在 akka-http 中,您可以:
- 设置 akka.http.server.max-connections,这将阻止超过该数量的连接。超过此限制意味着客户端将获得连接超时。
- 设置 akka.http.server.pipelining-limit,它可以防止给定的连接一次有超过这个数量的未完成请求。超过这意味着客户端将获得套接字超时。
这些都是从 http 服务器到客户端的背压形式,但两者都非常低级,并且仅与服务器的性能间接相关。
根据服务器看到的请求率,似乎更好的是在 http 级别进行背压。可能通过返回 429 - Too Many Requests。请求率也可以说是性能的间接衡量标准,但它似乎比连接数更接近。
这似乎是一件相当合理的事情,但我很难找到任何现有的模式。这是我能找到的最接近的参考:https ://github.com/akka/akka-http/issues/411
据我所知,最好的方法是抓住Flow
你把你Route
变成的东西,并将其插入到一个图表中,该图表具有全局请求率度量(或者可能是单个处理队列)和绕过Route
(完全返回 429 或其他)。
有更好的想法吗?
scala - 如何控制akka流背压缓慢的下游api
我有一个 API 调用另一个下游 API。下游 api 有其局限性,它的吞吐量每秒只能处理 75 个请求。
我想使用 akka 流来控制 API 每秒不超过 75 个下游 api 请求。
这是代码片段。
当我通过 Gatling 每秒发送 100 个请求时。由于下游 api 无法处理负载,30% 的请求发送失败。有没有办法应用背压进行配置,例如如果无法处理,则避免进行下游 api 调用。
scala - 使用 akka-http 流的选择性请求限制
我有一个 API 调用另外两个下游 API。一个下游 api ( https://test/foo
) 非常重要,而且速度非常快。另一个慢速下游 api( https://test/bar
) 有其局限性,它的吞吐量每秒只能处理 50 个请求。
我想确保下游 apihttps://test/foo
的优先级高于https://test/bar
. 比如API线程池为75,我只允许50个并行传入连接通过https://test/bar
。其余连接应用于https://test/bar
. 它将https://test/bar
永远不会失败。
我想我应该用 OverflowStrategy.dropNew 应用油门或缓冲https://test/bar
。
这是代码片段。
问题 1:我应该在哪里throttle(50, 1 seconds, 5000, ThrottleMode.Shaping)
只符合https://test/bar
阈值。
问题 2:如果我想优先处理请求,是否需要应用buffer和OverflowStrategy.dropNew 。https://test/foo
换句话说,https://test/bar
应该删除所有不必要的连接。
问题3:有没有更好的方法来实现这个要求。我connection.handleWith[Flow[HttpRequest, HttpResponse]]
在 Sink 中使用,我不确定这是正确的地方。
如果提供了一些代码片段,那将非常感激并且超级棒:)
提前致谢
java - 使用 RxJava2 创建一个带有 generate 函数的 flowable
我需要创建一个自定义 Flowable 并实现背压。我正在尝试实现某种分页。这意味着当下游请求 5 个项目时,我将“向数据源询问”项目 0 - 5。然后当下游需要另外 5 个项目时,我将获取项目 5 - 10 并发回。
到目前为止我发现的最好的事情是使用Flowable.generate
方法,但我真的不明白为什么没有办法(据我所知)如何获取requested
下游请求的项目数量。我可以使用state
生成器的属性来保存最后请求的项目的索引,所以我只需要新请求的项目的数量。我在 BiFunctionapply
中得到的发射器实例GeneratorSubscription
是从AtomicLong
. 所以投射 emmiterAtomicLong
可以得到我请求的号码。但我知道这不是“推荐”的方式。
另一方面,当您使用时,Flowable.create
您会得到具有long requested()
方法的 FlowableEmitter。usinggenerate
更适合我的用例,但现在我也很好奇什么是“正确”的使用方式Flowable.generate
。
也许我想太多了,所以请指出我正确的方向。:) 谢谢。
这是实际代码的样子(在 Kotlin 中):
c - 如何在 cassandra 中运行 EC2 告密者配置
我正在尝试在 Ec2 snitch 配置中完成单个 cassandra 节点的启动。
我为此配置更改的 yaml 属性是:
所有其他的 thrift、rpc_broadcast、broadcast 都被注释掉并且没有被启用。
尝试启动连接时出现此错误
INFO [main] 2017-12-04 16:54:15,021 RateBasedBackPressure.java:123 - 高比率的初始化背压:0.9,因子:5,流量:FAST,窗口大小:2000。INFO [main] 2017-12 -04 16:54:15,022 DatabaseDescriptor.java:725 - 使用策略 org.apache.cassandra.net.RateBasedBackPressure {high_ratio=0.9, factor=5, flow=FAST} 禁用背压。启动期间遇到异常(org.apache.cassandra.exceptions.ConfigurationException):实例化告密类“org.apache.cassandra.locator.Ec2Snitch”时出错。org.apache.cassandra.exceptions.ConfigurationException:实例化告密类“org.apache.cassandra.locator.Ec2Snitch”时出错。在 org.apache.cassandra.utils.FBUtilities.construct(FBUtilities.java:543) 在 org.apache.cassandra.utils.FBUtilities.construct(FBUtilities.java: newInstance(NativeConstructorAccessorImpl.java:62) 在 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 在 java.lang.reflect.Constructor.newInstance(Constructor.java:423) 在 java.lang.Class.newInstance(Class .java:442) at org.apache.cassandra.utils.FBUtilities.construct(FBUtilities.java:528) ... 还有 8 个错误 [main] 2017-12-04 16:54:18,308 CassandraDaemon.java:706 - 异常在启动 org.apache.cassandra.exceptions.ConfigurationException 期间遇到:实例化 snitch 类 'org.apache.cassandra.locator.Ec2Snitch' 时出错。在 org.apache.cassandra.utils.FBUtilities.construct(FBUtilities.java:543) ~[apache-cassandra-3.11.1.jar:3.11.1] 在 org.apache.cassandra.utils.FBUtilities.construct(FBUtilities. java:521) ~[apache-cassandra-3.11.1.jar:3.11.
我无法找出错误的最初原因。我已经检查了 rack-dc 和 topology-properties 文件中的任何属性的初始化。但我没有发现任何值得引起这个问题的东西。该节点甚至没有初始化初始模式。谁能帮我解决这个问题?如果可能,请帮助我在 Ec2 snitch 配置中设置多节点设置。我已经尝试了所有可能的方法来建立具有 Ec2 配置的单节点。所以,我想把它贴在 SO 中,让大智慧帮助我解决这个问题。
提前致谢。