问题标签 [backpressure]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
547 浏览

javafx - RxJava - 背压键盘输入?

这是一个有趣的 RxJava 问题。

我想在输入每个字符时使用 RxJava 背压运算符快速查找输入的输入,就像 Google 在其搜索页面上所做的那样。我浏览了Backpressure文档并想出了这个(我正在使用 RxJavaFX/RxKotlinFX 来利用 JavaFX)。

这很好用。如果我对控件输入“Hello”,它将String在 200 毫秒无输入后发出“Hello”。但如果我想让它真正响应更快,我实际上应该为每个按键输入进行某种滚动累积。然后我的控制台输出应该看起来像这样:

当我输入“你好”这个词时,这些应该是我的全部排放,而 200 毫秒定义了在累积重置之前应该经过多长时间。我该怎么做呢?

0 投票
1 回答
3071 浏览

apache-flink - Apache Flink:它如何处理背压?

对于一个算子来说,输入流比它的输出流快,所以它的输入缓冲区会阻塞前一个算子的输出线程,把数据传输给这个算子。正确的?

Flink 和 Spark 是否都通过阻塞线程来处理背压?那么它们之间有什么区别呢?

对于数据源来说,它一直在生产数据,如果它的输出线程被阻塞了怎么办?缓冲区会溢出吗?

0 投票
2 回答
1365 浏览

buffer - RxJava:如何实现一个带缓冲区的慢消费者和一个快速消费者到同一个源?

在处理各种背压场景时,我实现了一个情况,其中一个订阅者使用缓冲区很慢,而另一个订阅者消耗了扔给它的任何内容。那是使用 Scala 和 Akka Streams。如果您愿意,可以在此处查看代码以及在此处运行它的测试。

我通常会尝试开发一个 RxJava 版本进行比较,但我被困在了这个版本上。在 Akka Streams 中,我可以构建一个图,其中一个源在 2 个频道上进行广播,并从这些频道中消耗一个慢速接收器和一个快速接收器。每个通道都可以独立应用缓冲和节流。在 RxJava 中,有share用于广播的运算符,但缓冲和节流逻辑不在 . 上Subscriber,而是在Observable. 因此,我不确定如何应用缓冲和节流并且不会影响两个订阅者。Akka Streams 和 RxJava 都是 Rx 的实现,我希望有一种方法可以得到我想要的。

这是我正在尝试做的图片版本。

0 投票
1 回答
700 浏览

java - 即使在应用适当的运算符后,RxJava 代码中的 BackPressure 异常

我在我的 android 应用程序中使用 RxJava。我正在使用使用 interval() 函数的计时器,但即使我添加了 onBackPressureDrop(),我也会收到 Missing Backpressure 异常。我还向订阅者添加了 onError() 并将异常记录到 Crashlytics,但它仍然崩溃。请帮忙。我花了一周的时间来解决问题,但无济于事。代码偶尔会崩溃,我什至无法重现它。TraceonError 直接取自 rxjava git 上的官方问题。我认为它是负责打印 Crashlytics 异常的人,尽管我可能错了。

我的延迟 Observable -

我的观察-

traceonError()-

碰撞 -

0 投票
2 回答
945 浏览

android - RxJava onBackpressureBuffer 不发射项目

我目睹了 onBackpressureBuffer 的奇怪行为,我不确定它是有效行为还是错误。

我有一个 tcp 调用,它以一定的速率发射项目(使用流和 inputStream,但这只是为了一些信息)

最重要的是,我使用 create 创建了一个 observable,每次准备就绪时都会发出一个项目。

我们称之为messages()。

然后我这样做:

我注意到使用分析工具时很少抛出 MissingBackPressureException,因此我在调用中添加了 onBackpressureBuffer。

如果我在之后添加它observeOn

一切工作正常,但这意味着它只有在到达 UI 主线程后才会缓冲,所以我更喜欢它是这样的:

这就是事情开始变得奇怪的地方。

我注意到虽然messages()一直在发出项目,但在某些时候它们将停止交付给订阅者。

更准确地说,恰好在 16 个项目之后,显然正在发生的事情是缓冲区将开始保存项目而不将它们向前传递。

一旦我messages()使用某种超时机制取消了它,它将导致messages()发出onError(),并且缓冲区将立即发出它保留的所有项目(它们将被处理)。

我已经检查过是否是订户做太多工作的错,但不是,他已经完成了,但他仍然没有得到物品......

我也尝试过request(n)在订阅者中使用该方法,在完成后要求一项,onNext()但缓冲区不起作用。

我怀疑主 Android UI 线程的消息系统与背压导致了这种情况,但我无法解释原因。

有人可以解释为什么会这样吗?这是一个错误还是一个有效的行为?天呐!

0 投票
1 回答
47 浏览

scala - 基于资源稀缺性的背压可观测

在 RxJava 1 / RxScala 中,如何在以下情况下限制/背压可观察的源?

一个可能的解决方案就是阻塞直到。哪个有效,但这非常不雅,并且会阻止多个同时请求:

0 投票
1 回答
224 浏览

node.js - Node.js:在可写流中的背压上写入了错误的额外字节

我使用 Socket.io 进行了简单的二进制传输,将文件从客户端传输到服务器。我认为它有效,但我意识到文件的大小不同。在 writableStream.write 失败时,我附加了 Drain 事件处理程序以保持等待,直到它可以被重写并继续写入,但是每次发生 Drain 事件时,文件的大小都会增加触发 Drain 事件的次数,每 10240 字节大小我为每个块传输设置。

在我在这里写代码之前,我需要解释一下代码流程:

  1. 客户端请求上传文件
  2. 服务器创建空文件(创建可写流)并授权传输
  3. 客户端传输数据(块)直到结束
  4. 服务器用可写流写入块
  5. 客户端在全部发送后结束传输
  6. 服务器关闭可写流。
  7. 完毕!

这是服务器端代码:

它是客户端(添加了用于读取二进制文件的代码):

我用 4,162,611 字节大小的文件测试了这段代码,它有 1 次写入失败(1 次背压)。上传后,我检查了创建文件的大小,它是 4,172,851 字节,比原始文件大 10240 字节,它是块大小(10240)。

有时写入失败 2 次,大小比原来的大 20480 字节,是我发送的块大小的两倍。

我仔细检查了我的背压代码,但对我来说似乎没有错。我正在使用 Node v6.2.2 和 Socket.io v1.6.0,通过 Chrome 浏览器测试。有什么我错过的吗?还是我误解了背压?任何建议将不胜感激。

更新

看起来当背压发生时,它写了两次相同的数据(正如我在评论中所说)。所以我修改了这样的代码:

有效。我很困惑,因为当可写流无法写入时,它不会将数据写入流中,但实际上不是。有人知道吗?

0 投票
2 回答
3122 浏览

java - 使用“executeAsync”时如何限制对 cassandra 的写入请求?

我正在使用 datastax java driver 3.1.0 连接到 cassandra 集群,我的 cassandra 集群版本是 2.0.10。我正在使用 QUORUM 一致性异步编写。

我上面的保存方法将以非常快的速度从多个线程中调用。

问题:

我想将请求限制为executeAsync异步写入 Cassandra 的方法的请求。如果我的写入速度超出我的 Cassandra 集群可以处理的速度,那么它将开始抛出错误,我希望我的所有写入都应该成功进入 cassandra,而不会造成任何损失。

我看到了这篇文章,其中解决方案是使用Semaphore固定数量的许可证。但我不确定如何以及实现它的最佳方法是什么。我以前从未使用过 Semaphor。这就是逻辑。任何人都可以根据我的代码提供一个基于信号量的示例,或者如果有更好的方法/选项,那么也请告诉我。

在编写数据加载程序的上下文中,您可以执行以下操作:

  • 为了简单起见,请使用 Semaphore 或其他具有固定数量许可的构造(这将是您的最大飞行请求数)。每当您使用 executeAsync 提交查询时,都需要获得许可。您实际上应该只需要 1 个线程(但可能需要引入一个 # cpu cores size 的池来执行此操作)从 Semaphore 获取许可并执行查询。它只会阻止获取,直到有可用的许可证。
  • 将 Futures.addCallback 用于从 executeAsync 返回的未来。回调应在 onSuccess 和 onFailure 情况下调用 Sempahore.release()。通过释放许可,这应该允许您在步骤 1 中的线程继续并提交下一个请求。

此外,我还看到了其他几篇他们谈到使用的帖子RingBuffer,或者Guava RateLimitter哪一个更好,我应该使用?以下是我能想到的选项:

  • 使用信号量
  • 使用环形缓冲区
  • 使用 Guava 速率限制器

谁能帮我举一个例子,说明我们如何限制请求或获得 cassandra 写入的背压并确保所有写入成功进入 cassandra?

0 投票
1 回答
2340 浏览

node.js - 什么是流处理的背压

我开始学习节点和流似乎是你经常使用的东西,在我读过的大多数文档中,当你处理大型文件时提到了“背压问题”,但我没有找到明确的解释这个问题到底是什么。我也读过使用管道可以帮助解决这个问题,但是管道究竟是如何解决背压问题的?

感谢您提前进行任何解释。

0 投票
1 回答
184 浏览

observable - RxJava 2:关于因 BackPressure 而丢弃的消息的统计信息?

我正在使用带有 DROP BackPressure 策略的 RxJava 2 Flowable。有什么方法可以收集有关由于 BackPressure 而实际丢弃了多少消息的信息/统计信息?

解决方案

Flowable#onBackpressureDrop(consumer -> {}); http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#onBackpressureDrop(io.reactivex.functions.Consumer)