问题标签 [backpressure]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
305 浏览

android - 使用 hot observable 避免背压

我正在使用Android ReactiveLocation 库来接收定期位置更新。即使我的应用程序中没有任何东西在使用它们,我也想继续接收位置更新,这样我总是有一个最新的位置,我可以在需要时立即使用。

这就是我开始在我的应用程序的核心组件中获取位置更新的方式。我想将最新值和任何新检测到的位置重新发布给我的应用程序链中更下游的任何订阅者,这就是replay(1)目的。

在应用程序的其他地方,我订阅了这个重新发布的 obervable:

这似乎可以完成工作:我的最终订阅者立即获得最新的位置,并继续接收新的更新,但我想确保当最终订阅者时,我不会在链中某处建立大量未使用位置的缓冲区未订阅。我是 Rx 菜鸟。背压如何应用于这种情况?正在replay(1)做我所期望的,并丢弃除最新位置之外的所有不需要的位置?

0 投票
3 回答
5942 浏览

node.js - 从nodejs中的可写流中暂停管道可读流的正确方法是什么?

我正在编写一个模块,它是一个可写流。我想为我的用户实现管道接口。

如果发生一些错误,我需要暂停可读流并发出错误事件。然后,用户将决定 - 如果他可以接受错误,他应该能够恢复数据处理。

我看到节点为我们提供了readable.pause()方法,可以用来暂停可读流。但我无法从可写流模块中调用它:

如何在可写流中实现背压?

PS 可以使用pipe/unpipe提供可读流作为参数的事件。但也有人说,对于管道流,暂停的唯一机会是将可读流从可写流中分离出来。

我做对了吗?在用户调用恢复之前,我必须解除可写流的管道?在用户调用恢复之后,我应该将可读流返回吗?

0 投票
2 回答
1080 浏览

java - RxJava:丢弃物品?- 背压

我使用 RxJava 来观察点击几个按钮。

这些订阅将在一个对象上调用不同的函数,这需要几毫秒。这些功能是同步的。

问题是当按下太多按钮时,我会遇到背压异常。对我有用的是放弃几个输入(最好是旧的)。RxJava 有可能吗?

0 投票
1 回答
176 浏览

javascript - Highland.js 如何实现背压?

究竟是如何Highland.js实现背压的?我很好奇这个。

0 投票
0 回答
326 浏览

java - 如何处理 SyncOnSubscribe RxJava 中的 generateState 错误?

我试图掌握创建 SyncOnSubscribe 的窍门,但不太确定如果在generateState状态初始化期间该方法失败该怎么办。

我可以想到几种可以临时处理的方法:

  1. 如果抛出运行时异常,库是否会自动调用o.onError?(见编辑)。
  2. 我可以包装MyState另一个存储错误的变量,我可以o.onError在第一次调用时自行next调用。

我只是好奇是否有建议的做法?

谢谢!

编辑:我尝试在generateState方法中抛出一个运行时异常,我认为它指向我将在方法MyState中检查的错误包装。next如果您有更好的建议,请发表评论/回答。

使用main上面的函数getEventsOnSubscribe1getEventsSyncOnSubscribe都调用了订阅者onError,但他们让程序挂起。使用getEventsOnSubscribe2which 包装异常并手动调用s.onError,程序能够退出。

0 投票
2 回答
3409 浏览

java - 为 Cassandra Writes 获得背压的最佳方法是什么?

我有一项服务以我控制的速率从队列中消耗消息。我做了一些处理,然后尝试通过 Datastax Java 客户端写入 Cassandra 集群。我已经使用maxRequestsPerConnection和设置了我的 Cassandra 集群maxConnectionsPerHost。但是,在测试中,我发现当我到达maxConnectionsPerHostmaxRequestsPerConnection呼吁session.executeAsync不要阻止时。

我现在正在做的是使用 anew Semaphore(maxConnectionsPerHost * maxRequestsPerConnection)并在每个异步请求之前增加它,并在完成时返回它executeAsync。这工作得很好,但它似乎是多余的,因为驱动程序已经在内部跟踪请求和连接。

有没有人想出更好的解决方案来解决这个问题?

一个警告:我希望一个请求在完成之前被认为是未完成的。这包括重试!我从集群中获得可重试失败的情况(例如等待一致性的超时)是我想要背压并停止使用队列中的消息的主要情况。

问题:

当前解决方案:

另外,任何人都可以看到这个解决方案有任何明显的问题吗?

0 投票
1 回答
182 浏览

android - 为什么在使用 backpreasurebuffer 时会出现 MissingBackpreasureException?

我想在 RxJava 中实现一个下载一些文件的处理队列。我要下载的文件数量可能多达 100 个左右。

一切都是在 Android 上使用 RxJava 1.1.1 开发的

我当前的实现看起来像这样:

Where_getObserver()返回一个新的观察者对象,该对象在“onNext”方法中下载到文件中。

但是,我的问题是我很快得到了一个MissingBackpreasureException我不明白的。我尝试实现 a backpreasurebuffer,但它似乎没有被调用。

我究竟做错了什么?

0 投票
1 回答
135 浏览

clojure - core.async 切换通道组合

我正在试验 core.async 的混音。似乎在混音中使输入通道静音将是实现背压的一种可能方式。我正在使用下面的代码:

评估 REPL 中的最后一行给出

CompilerException java.lang.IllegalArgumentException: No implementation of method: :toggle* of protocol: #'clojure.core.async/Mix found for class: java.lang.Boolean.

上面的示例代码有什么问题?

谢谢!

0 投票
1 回答
4696 浏览

apache-spark - Spark Streaming Kafka 背压

我们有一个 Spark Streaming 应用程序,它从接收器中的 Kafka 队列中读取数据,并进行一些转换并输出到 HDFS。批处理间隔为 1 分钟,我们已经调整了背压和spark.streaming.receiver.maxRate参数,因此大部分时间都可以正常工作。

但是我们还有一个问题。当 HDFS 完全宕机时,批处理作业会长时间挂起(假设 HDFS 不工作 4 小时,作业会挂起 4 小时),但接收方不知道作业未完成,因此它仍在接收接下来 4 小时的数据。这导致OOM异常,整个应用程序宕机,我们丢失了很多数据。

所以,我的问题是:是否有可能让接收者知道工作没有完成,所以它会收到更少(甚至没有)的数据,当工作完成时,它会开始接收更多的数据来赶上。在上述情况下,当HDFS down时,receiver会从kafka中读取较少的数据,并且接下来4小时产生的block真的很小,receiver和整个应用程序都没有down,在HDFS ok后,receiver会读取更多数据并开始迎头赶上。

0 投票
1 回答
115 浏览

caching - Play 中是否会有 Reactive Cache API

我们在使用Play-Cache时遇到了 API 阻塞的问题。有没有人知道在 Play 中提出一个新的缓存 API 的计划,它不仅是非阻塞的,而且对背压有反应?

我看过Play-Redis,但它似乎不是完全非阻塞的,而且可能不是反应式的。