问题标签 [backpressure]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
642 浏览

java - 事件调度线程“太慢”时 Java Swing UI 的迟缓行为

如果我使用 使用单独的线程从 Java 中充斥 Swing 事件队列SwingUtilities.invokeLater(..),则事件调度线程无法跟上步伐,事件队列填满并且应用程序运行缓慢。

例如......在下面的例子中,我只能看到屏幕上显示的几个整数(mac os-x yosemite,java 8 update 25):

有没有办法“放慢速度”,这样我才能真正看到 GUI 中的每一个变化?调用标签repaint后在某处调用某种方法?setText(..)

疯狂的闪烁是完全可以的,因为我希望我的眼睛比像素可以改变的“慢”。目前这个例子表现得“奇怪”,最后我看到一个特定的数字,比如让我们说 1070201 一整秒,然后我看到 1531452 一整秒。

当然,我可以使用SwingUtilities.invokeAndWait(..)而不是为SwingUtilities.invokeLater(..)“生产”主线程建立背压,以防止“填满”事件队列。难道没有别的办法了吗?

0 投票
2 回答
198 浏览

c# - 一个枚举器包装器,它预先缓冲来自底层枚举器的多个项目

假设我有一些IEnumerator<T>在方法内部进行了大量处理MoveNext()

从该枚举器消耗的代码不仅消耗与可用数据一样快,而且偶尔会等待(其细节与我的问题无关)以同步需要恢复消耗的时间。但是当它下一次调用 时MoveNext(),它需要尽可能快的数据。

一种方法是将整个流预先消耗到某个列表或数组结构中以进行即时枚举。然而,这将浪费内存,因为在任何一个时间点,只有一个项目正在使用,并且在整个数据不适合内存的情况下,这将是令人望而却步的。

那么.net中是否有一些通用的东西以某种方式包装枚举器/可枚举,它预先异步预迭代底层枚举器几个项目并缓冲结果,以便它始终在其缓冲区中有许多可用的项目和调用 MoveNext 将永远不必等待?显然,消耗的项目,即由调用者的后续 MoveNext 迭代的项目,将从缓冲区中删除。

注意我正在尝试做的部分工作也称为Backpressure,并且在 Rx 世界中,已经在RxJava中实现,并且正在Rx.NET中进行讨论。Rx(推送数据的 observables)可以被认为是枚举器的相反方法(枚举器允许拉取数据)。背压在拉取方法中相对容易,正如我的回答所示:只需暂停消费。推动时更难,需要额外的反馈机制。

0 投票
2 回答
487 浏览

java - 我如何“强制”某种背压来避免 rxjava 中的多次执行?

我有一段代码,他的工作是更新本地缓存。此缓存更新有两个触发器:

  1. 以固定间隔
  2. 要求时

所以这是一个关于我如何做到这一点的基本示例。

然后我有

这可行,但问题是当我开始有多个并发调用时forceReload():将不会并发执行reloadData()但元素将排队并且进程将循环重新加载数据,直到发送到的所有事件forceReloadEvents都已被消耗,即使forceReload()已经完成由于之前的事件发布了CountDownLatch.

我想使用onBackPressureDrop,但似乎没有诱发背压,也没有任何东西被丢弃。我想要的是某种强制背压的方法,以便合并了解一次只能处理一个元素,并且必须删除任何后续事件,直到当前执行完成。

我考虑过使用bufferor throttleFirst,但我不想强制每个事件之间有一个特定的时间,我宁愿根据重新加载缓存所需的时间进行自动缩放。您可以将其视为throttleFirst直到reloadData完成。

0 投票
0 回答
238 浏览

scala - Scala 并行处理文件的方式是什么,有背压?

以下代码将逐行读取文件,为每一行创建一个任务,然后将其排队到执行程序。如果执行程序的队列已满,则停止从文件中读取,直到再次有空间为止。

我查看了 SO 中的一些建议,但它们要么要求将文件的全部内容读入内存,要么要求不理想的调度(例如,读取 100 行,并行处理它们,只有在完成之后,才读取接下来的 100 行) . 我也不想为此使用像 Akka 这样的库。

没有这些缺点的 Scala 实现这一目标的方法是什么?

说明性定义executorWithBoundedQueue

0 投票
2 回答
3008 浏览

java - 如何在背压期间仅缓冲来自 rx.Observable 的最新发射

我有一个rx.Observable将任务进度发送到onNext(). onNext()排放有时会发生得如此之快,以至于无法Observer跟上,从而导致背压。我想通过仅缓冲来自Observable.

例如:

  • Observable发出1Observer接收1
  • 虽然Observer仍在处理1,但Observable发出234
  • Observer完成处理1并开始处理4(排放23被丢弃)。

这似乎是在 Rx Observable 中处理进度的常见情况,因为您通常只关心使用最新进度信息更新 UI。但是我一直无法弄清楚如何做到这一点。

任何人都知道如何使用 RxJava 实现这一点?

0 投票
1 回答
696 浏览

scala - 与 Akka Streams 同步反馈

我想要实现的是用 akka 流实现类似同步反馈循环的东西。

假设你有一个Flow[Int].filter(_ % 5 == 0). 当您将Int's 流广播到此流并直接在其后面压缩元组时,您会得到类似

有没有办法发出一个Option[Int],它指示在我推动下一个元素通过它之后流是否发出一个元素?

我想过实现我自己DetachedStage的右前后后Flow保持一个状态,每当流量拉到之前的舞台上时,我就知道他需要下一个元素。当后面的舞台没有收到元素时,它是None。

不幸的是,结果并不好,并且被许多职位所淘汰。

旁注

过滤器流只是一个例子,它可以是一个非常长的流,我无法提供Option在它的每个阶段发出的能力,所以我真的必须知道,流是否推动了下一个或没有而是从下游请求下一个

我也玩过conflateand expand,但这些结果的位置偏移更糟

我在配置中更改的一件事是流的initial缓冲区max,因此我可以确定指示的需求确实是在我推动它的元素之后。

很高兴获得有关如何解决此问题的一些建议!

0 投票
1 回答
967 浏览

scala - Akka HTTP 背压连接

文档Http().bindAndHandle()

没有对连接源应用背压,即所有连接都以最大速率被接受,这取决于应用程序,可能会带来 DoS 风险!

这同样适用于bindAndHandleAsync() bindAndHandleSync()

该文档还指出,即使是更高级别的系统,例如文件 IO 或 TCP,我认为位于 TCP 之上的 HTTP 也将通过反应流机制工作。

Http().bind()魔法功能吗?这会施加背压吗?

如何使用 akka-streams 公开背压 HTTP 端点?

0 投票
1 回答
851 浏览

rx-java - RxJava 背压和对生产者的调用次数

我正在尝试使用 rx Java 中的背压在我的 Android 应用程序中创建无限滚动。我希望它只调用外部服务请求的次数(调用后request(1))。但是在使用 flatmap 之后,每次subscribe加载 16 页。

在我的代码下面有预期的结果。几乎每个测试都因为第一次请求而失败(n=16)

0 投票
1 回答
1153 浏览

java - 用于内部微服务调用的消息总线与 Quasar/HTTP

我正在寻找优化当前使用 HTTP/REST 进行内部节点到节点通信的微服务架构。

一种选择是在服务中实现背压功能,(例如)通过将类似 Quasar 的东西集成到堆栈中。这无疑会改善情况。但我看到了一些挑战。一种是,异步客户端线程是瞬态的(在内存中),并且在客户端失败(崩溃)时,这些重试线程将丢失。第二,理论上,如果目标服务器宕机一段时间,客户端最终可能会到达 OOM 尝试重试,因为线程最终是有限的,甚至是 Quasar Fibers。

我知道这有点偏执,但我想知道基于队列的替代方案在非常大的规模上是否更有利。

它仍然可以像 Quasar/fibers 一样异步工作,除了 a) 队列是集中管理的,并且脱离客户端 JVM,b) 队列可以是持久的,因此在客户端和/或目标服务器出现故障时,没有飞行中的消息丢失了。

当然,排队的缺点是有更多的跃点,它会减慢系统的速度。但我认为可能存在一个甜蜜点,即 Quasar ROI 达到峰值,集中且持久的队列对于扩展和 HA 变得更加重要。

我的问题是:

是否讨论过这种权衡?是否有关于使用集中式外部队列/路由器方法进行服务内通信的论文。

TL;博士; 我刚刚意识到我可以将这个问题表述为:

“什么时候适合在微服务架构中使用基于消息总线的服务内通信而不是直接 HTTP。”

0 投票
1 回答
1303 浏览

java - 在这种情况下,为什么我们需要 Publish 和 RefCount Rx 运算符?

我正在尝试熟悉反应式背压处理的问题,特别是通过阅读此 wiki:https ://github.com/ReactiveX/RxJava/wiki/Backpressure

在缓冲区段落中,我们有这个更复杂的示例代码:

如果我理解正确,我们通过为缓冲区操作符生成去抖信号流来有效地去抖突发源流。

但是为什么我们需要在这里使用 publish 和 refcount 操作符呢?如果我们只是丢弃它们会导致什么问题?评论并没有让我更清楚,RxJava Observables 不是默认支持多播吗?