问题标签 [backpressure]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
771 浏览

akka-stream - 应用级背压 VS TCP 原生流控

鉴于 TCP 提供本机流量控制,我正在研究为什么某些系统实现应用程序级背压。

我正在阅读,特别是akka-streams和(更高级别的讨论)反应流

是不是只是把异步通信的思想抽象出了网络,脱离了TCP协议?

其他更准确的问题:

  • 如果应用程序(比如 akka-streams 应用程序)最终通过 TCP 进行通信,它会退回到 TCP 的本机背压吗?

  • 反应流是否通过简单地让 TCP 处理它来在 TCP 之上实现应用程序级背压?

任何帮助和指示将不胜感激!谢谢 :)

0 投票
2 回答
13694 浏览

apache-kafka - Kafka中的背压

我在 Kafka 中遇到过一种情况,即生产者发布消息的速度远远高于消费者的消费速度。我必须在kafka中实现背压实现,以便进一步消费和处理。

请让我知道如何在 spark 和普通的 java api 中实现。

0 投票
1 回答
1286 浏览

amazon-web-services - 当缓慢的消费者在流处理(spark、aws)中强制背压时避免数据丢失

我是分布式流处理(Spark)的新手。我已经阅读了一些教程/示例,这些教程/示例涵盖了背压如何导致生产者减速以响应过载的消费者。给出的经典示例是摄取和分析推文。当流量出现意外高峰时,消费者无法处理负载,他们会施加背压,生产者会通过调低速率来做出响应。

我没有真正看到的是在实践中使用了哪些方法来处理由于整个管道容量较低而无法立即处理的大量传入实时数据?

我想这个问题的答案取决于业务领域。对于某些问题,只删除该数据可能会很好,但在这个问题中,我想关注我们不想丢失任何数据的情况。

由于我将在 AWS 环境中工作,我的第一个想法是在 SQS 队列或 Kinesis 流中“缓冲”多余的数据。在实践中是否像这样简单,或者对于这个问题有更标准的流式解决方案(可能是 Spark 本身的一部分)?

0 投票
1 回答
1273 浏览

node.js - node.js 如何处理带背压的快速生产者和慢速消费者

我是 node.js 的新手,不了解有关流的文档。希望能得到一些提示。

我正在读取一个非常大的文件行,然后对于每一行我都调用一个异步网络 api。

显然,本地文件的读取速度比异步调用完成的速度要快得多:

在“执行”方法中添加背压的方法是什么?

0 投票
0 回答
374 浏览

java - 如何设置 Spring Controller 来处理背压请求

Spring MVC 控制器将用作另一个服务的代理,以服务可能的大文件。

为了避免在任何给定时间将来自另一个服务的整个 WebClient 响应存储在内存中,我想使用 Spring 5 + Reactor 项目的反应性属性并在浏览器使用它时流式传输其响应。

当前代码看起来像

它导致

  • 如何配置控制器来处理Mono<ClientResponse>
  • 如果响应是类型Mono的,请求会被背压吗?是否需要将其转换为字节块并制作它Flow
  • Spring 文档说明了背压的很多好处,但没有回答上述问题,也没有给出任何关于Mono响应的警告。它仍然是反应性的吗?
0 投票
1 回答
56 浏览

rxjs - 使用背压处理来自 rxjs 可观察订阅的数据的正确方法

我有一个rxjs.observable(rxjs 版本 6.2.1)返回urls我需要向其发出GET请求。

对于每一个url我需要通过函数发出请求processURL(url)。根据反应哲学,处理所有这些传入的 url 并逐个发出请求而不是在subscribe发出数据后立即触发所有请求的正确方法是什么?请注意,在这种情况下,observableurlObservable$返回数据的速度将比需要使用返回的url.

processURL可以返回一个promise.

谢谢。

0 投票
0 回答
251 浏览

rx-java2 - 使用间隔时的背压

我有这个 RxJava2 可流动的

和这个订阅者

它产生一个MissingBackPressureException,我“有点”理解:由于我在同一个调度程序上订阅和观察,我最初认为可能不会发生背压,但也许这是一个误解。

当我将流动性更改为

我不再得到异常,但也没有调用 onNext。这是为什么?

0 投票
1 回答
19499 浏览

java - Spring Web-Flux 中的背压机制

我是Spring Web-Flux的初学者。我写了一个控制器如下:

我知道反应性好处之一是Backpressure,它可以平衡请求或响应率。我想了解如何在Spring Web-Flux中具有背压机制。

0 投票
1 回答
429 浏览

java - RxJava 1.x:如何在单元测试中模拟背压

给定代码:

在哪里,

我有一个单元测试:

测试失败,因为尽管有Thread.sleep,但没有产生背压。我不明白为什么不这样做;内部环形缓冲区应在 2 个项目后填满,其余项目应丢弃。

0 投票
1 回答
397 浏览

apache-flink - Apache Flink:架构问题:背压和处理故障模式

我刚开始阅读有关 Flink 的内容,想了解更多有关 Flink 如何处理背压以及在存在背压时如何处理故障的信息。我已经阅读了 data-artisans https://data-artisans.com/blog/how-flink-handles-backpressure的这篇博客文章,并且有点理解缓冲池的概念以及它们如何处理背压。

我的问题是

(1) 水印在远程交换的情况下有何帮助(当任务在不同的节点上并且缓冲区被复制到有线 TCP 时)——在上面的博客中阅读。

(2) 系统面临背压时出现故障如何处理?

(3) 只是用于恢复的异步检查点吗?这个检查点包括什么?

(4) 一个检查点捕获了多少缓冲池的数据状态?