问题标签 [backpressure]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1087 浏览

scala - 在涉及多个 JVM 的 Akka Streams 中保持背压的方法

我知道从 Akka 2.4.16 开始,没有 Reactive Streams 的“远程”实现。该规范侧重于在单个 JVM 上运行的流。

但是,考虑到用例涉及另一个 JVM 进行某些处理,同时保持背压。这个想法是让一个主应用程序提供一个运行流的用户界面。例如,这个流有一个阶段执行一些应该在不同机器上运行的繁重计算。我对以分布式方式运行流的方法感兴趣 - 我遇到了一些文章指出了一些想法:

还有哪些其他选择?以上有什么明显的缺点吗?有什么特殊的特征需要考虑吗?

更新:这个问题不限于单个用例。我通常对在分布式环境中使用流的所有可能方式感兴趣。这意味着,例如,它可以只涉及一个将参与者集成在一起的流,.mapAsync或者例如,在通过 Akka HTTP 通信的两台机器上可能有两个单独的流。唯一的要求是必须在所有组件之间强制执行背压。

0 投票
1 回答
7012 浏览

hadoop - Spark Streaming 中的背压属性如何工作?

我有一个接收单个事件(字符串)的 CustomReceiver。收到的单个事件在 spark 应用程序的运行时用于从 nosql 读取数据并应用转换。当观察到每个批次的处理时间大于批次间隔时,我设置此属性。

spark.streaming.backpressure.enabled=true

之后,当批处理的处理时间超过批处理窗口时,我希望 CustomReceiver 不会触发和接收事件,这并没有发生,并且仍在添加积压的批处理。我在这里错过了什么吗?

0 投票
1 回答
81 浏览

java - 如何正确(实时)传递可观察的元素?

假设我有一个 aobservable,它发出 2D 点,表示某些路线的转折点

这意味着机器人从原点开始(0,0),然后到矩形的左下角,然后是右下角,然后返回,最后回到原点。

假设现在我想要一个 observable,它提供的点就像机器人以恒定的速度移动一样,比如每秒 1 个单位。

因此,第一个操作员按原样接收(0,0)并重新传输它。

然后它接收(0, 10)并看到,到该点的距离是单位,到达该点10需要几秒钟。10所以,新的 observable 应该发出

每秒一对,直到最后一个接收点到达并重新传输。然后操作员应该采取下一点并对其进行同样的操作。

如何使用 ReactiveX 实现这一点?

我怀疑我们应该在这里实施“bakpressure”来减慢可观察到的源,直到结果及时重新发射所有东西?

更新

我写了下面的代码,它可以工作。它使用两个flatMaps。一个是提供额外的点,并且每个点都带有时间戳(以秒为单位),另一个flatMap是引入适合时间戳的延迟。

该代码有效,但看起来很复杂。问题仍然存在:ReactiveX 库是否包含用于这些事情的现成工具:

0 投票
1 回答
507 浏览

rx-java - Observable 支持反应拉动

我一直在努力解决我认为是一个非常基本的问题。

我有一个Flowable从网络中检索一组项目并发出它们的方法。

我需要它在每个请求的基础上工作。也就是说,我保留一个Subscriptionsubscription.request(n)在必要时调用。

关于Backpressure的文章(“Reactive pull”部分)描述了Observer的观点,并简单地提到了

但是,要使 [reactive pull] 起作用,Observables A 和 B 必须正确响应 request() <...> 这种支持不是 Observables 的要求

它没有详细说明原则上如何实现这种支持。

是否有实现此类Observable/的通用模式Flowable?我没有找到一个很好的例子或参考。我能想到的一个糟糕的选择是有一个监视器,我会在while循环中锁定并从另一个线程emitter.requested() == 0解锁。doOnRequest()但是这种方法只是感觉很麻烦。

那么这种“减速”一般是如何实现的呢?

谢谢。

0 投票
1 回答
393 浏览

akka - Backpressure to ReactiveKafka when sending to sharded actors

I have written an Akka application which takes input from Kafka and then processes the data with sharded actors and output to Kafka.

But in some occasions the sharded regions can't handle the load, and I get:

You should probably implement flow control to avoid flooding the remote connection.

How can I implement backpressure in this chain/flow?

Kafka Consumer -> Shared Actors -> Kafka Producer

Some fragments from the code:

0 投票
0 回答
99 浏览

javascript - 使用 XHR 频率太高的 RxJ 会导致备份请求

我正在ng-cli使用Angular 2RxJs。我正在尝试做的简单解释是:我正在使用 RxJs 不断地将 XHR 生成到后端 REST 端点。我注意到,如果我制作这些 XHR 的频率是 100 毫秒、500 毫秒甚至 1000 毫秒,通过查看 Chrome/FireFox 开发者工具的网络视图,我可以看到很多 XHR 处于待处理状态;它们开始需要 200 毫秒到 1.5 秒,但随着 XHR 数量的增加,这些请求需要 10 秒到 20 秒才能完成;它们堆积起来,所有后来的 XHR 最终都处于未决状态。

TypeScriptRxJs对象一起使用的代码如下所示。

我已经阅读过websockets,这可能是将数据(如果它在服务器上更改)推送到客户端的更好方法,而不是这种拉数据的方法(无论它是否在服务器上更改)。但那是另一篇文章,IMO。

无论如何,关于如何改进此代码的任何想法?

0 投票
1 回答
147 浏览

scala - 如何创建不受背压影响的 Source

我想测试一些 Akka 流功能,例如conflate. 为此,我需要在一个简单的单元测试中构建一个不受背压影响的源。天真的尝试

由于背压而无法工作。OTOH 通过 HTTP 可能是矫枉过正。

如何为不受背压影响的单元测试创​​建一个简单的? Source

0 投票
1 回答
5376 浏览

java - rxjava 2 中 BackpressureStrategy.BUFFER 和 onBackpressureBuffer 运算符之间的区别

我是响应式编程领域的新手,我正在尝试使用 rxjava 2 创建一个简单的背压感知消息处理。

以下是我试图实现的工作流程:

  1. Flowable 的连续字符串流。

  2. 执行耗时操作并将消息更改为另一个字符串

  3. 执行另一个耗时的操作。

现在我正在使用以下代码:

现在对于小型操作,我没有看到任何与背压相关的问题。

但是对于大型流,我不知道它会如何表现。

现在我的问题是:-

  1. 在BackpressureStrategy.BUFFER的情况下,默认缓冲区大小是多少?数据在哪里被缓冲?

  2. 如果我想在每个耗时的任务之前创建两个背压缓冲区,我应该使用onBackpressureBuffer 运算符吗?

  3. 如果缓冲区已满,我不想丢失数据,我想等待或在那种情况下?

0 投票
1 回答
194 浏览

scala - Akka Streams 的 ActorPublisher 作为 Web 响应的源 - 背压的工作原理

我使用akka-streamsActorPublisher演员作为每个连接Source的数据流,这些数据被发送到传入的 WebSocket 或 HTTP 连接。

ActorPublisher合同是通过提供下游可以接受的元素数量来定期请求数据如果需求为 0,我不应该发送更多元素。我观察到,如果我缓冲元素,当消费者速度较慢时,缓冲区大小在 1 到 60 之间波动,但大多在 40-50 附近。

为了流式传输,我使用akka-http将 WebSocket 输出和HttpResponse数据设置为 a Sourceof Messages(或ByteStrings)的能力。

我想知道在这种情况下背压是如何工作的——当我通过网络将数据流式传输到客户端时。这些数字究竟是如何计算的?它是否检查网络级别发生的事情?

0 投票
1 回答
1185 浏览

java - RxJava2 可观察的背压

最近我意识到我不明白RxJava2背压是如何工作的。

我做了一个小测试,我希望它应该失败并出现MissingBackpressureException异常:

系统输出显示如下:

为什么它不产生MissingBackpressureException

我希望这e.onNext(i);会将项目放入缓冲区ObservableObserveOn并在其大小大于之后static final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());

它应该抛出MissingBackpressureException不会发生的。缓冲区会自动增长吗?如果不是,物品存放在哪里?