问题标签 [backpressure]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
5 回答
6341 浏览

c# - 使用 BlockingCollection: OperationCanceledException,有没有更好的办法?

我正在将(坦率地说很棒)BlockingCollection<T>类型用于高度多线程的高性能应用程序。

通过集合有很多吞吐量,并且在微观层面上它是高性能的。但是,对于每个“批次”,它总是通过标记取消令牌来结束。这会导致在任何等待Take调用时引发异常。这很好,但我会选择返回值或输出参数来发出信号,因为 a) 异常有明显的开销,b) 在调试时,我不想手动关闭特定的中断异常例外。

实现似乎很激烈,理论上我想我可以反汇编并重新创建我自己的不使用异常的版本,但也许有一种不太复杂的方法?

我可以null在集合中添加一个(或者如果不是,一个占位符)对象来表示进程应该结束,但是还需要一种很好地中止的方法,即唤醒等待线程并以某种方式告诉它们发生了什么事。

那么 - 替代集合类型?重新创建我自己的?有什么方法可以滥用这个?

(一些上下文:我选择了它,BlockingCollection<T>因为它比手动锁定 a 具有优势Queue。尽我所知,线程原语的使用非常棒,在我的情况下,这里和那里的几毫秒和最佳核心至关重要。 )

编辑:我刚刚为此开了一笔赏金。我不相信Anastasiosyal 的回答涵盖了我在评论中提出的问题。我知道这是一个棘手的问题。有人可以提供帮助吗?

0 投票
0 回答
316 浏览

node.js - Node.js node-csv-parser 死锁,因为背压不起作用

我正在尝试从 CSV 文件中读取 URL,检查它们是否可用,并将错误的 URL 写入另一个文件。我经历了大约几千次,然后急剧减速(从每秒检查 5 次到每 10 秒检查一次),然后得到“FATAL ERROR: CALL_AND_RETRY_2 Allocation failed - process out of memory”,大概是因为我没有申请正确地对发射流进行背压。

0 投票
1 回答
3881 浏览

akka - 如何使用 Akka BoundedMailBox 限制生产者

我有两个演员,一个正在生产消息,另一个正在以固定速率消费消息。

是否可以让生产者被消费者 BoundedMailBox 限制?(背压)

我的生产者目前是定期安排的(向它发送一条滴答消息),有没有办法让它在消费者邮箱中按可用性安排?

我使用的是一劳永逸的风格( consumer.tell() ),因为我不需要回应。我应该使用不同的消息发送方法吗?

0 投票
2 回答
267 浏览

android - 后按后停止应用程序不正确

按下后退按钮后的日志:

11-07 22:48:08.376: D/AndroidRuntime(5325): 关闭 VM 11-07 22:48:08.376: W/dalvikvm(5325): threadid=1: 线程退出未捕获异常 (group=0x4162d700) 11 -07 22:48:08.384: E/AndroidRuntime(5325): 致命异常: main 11-07 22:48:08.384: E/AndroidRuntime(5325): java.lang.RuntimeException: 无法销毁活动

{com.some.pack/com.some.packNY}:java.lang.NullPointerException 11-07 22:48:08.384:E/AndroidRuntime(5325):在 android.app.ActivityThread.performDestroyActivity

(ActivityThread.java:3627) 11-07 22:48:08.384: E/AndroidRuntime(5325): 在 android.app.ActivityThread.handleDestroyActivity(ActivityThread.java:3645) 11-07 22:48:08.384: E/AndroidRuntime (5325): 在 android.app.ActivityThread.access$1200(ActivityThread.java:153) 11-07 22:48:08.384: E/AndroidRuntime(5325): 在 android.app.ActivityThread$H.handleMessage(ActivityThread.java :1322) 11-07 22:48:08.384: E/AndroidRuntime(5325): 在 android.os.Handler.dispatchMessage(Handler.java:99) 11-07 22:48:08.384: E/AndroidRuntime(5325):在 android.os.Looper.loop(Looper.java:137) 11-07 22:48:08.384: E/AndroidRuntime(5325): 在 android.app.ActivityThread.main(ActivityThread.java:5289) 11-07 22 :48:08.384: E/AndroidRuntime(5325): 在 java.lang.reflect.Method.invokeNative(Native Method) 11-07 22:48:08.384: E/AndroidRuntime(5325):在 java.lang.reflect.Method.invoke(Method.java:525) 11-07 22:48:08.384: E/AndroidRuntime(5325): 在 com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run

(ZygoteInit.java:739) 11-07 22:48:08.384: E/AndroidRuntime(5325): 在 com.android.internal.os.ZygoteInit.main(ZygoteInit.java:555) 11-07 22:48:08.384 : E/AndroidRuntime(5325): at dalvik.system.NativeStart.main(Native Method) 11-07 22:48:08.384: E/AndroidRuntime(5325): 引起:java.lang.NullPointerException 11-07 22:48 :08.384: E/AndroidRuntime(5325): 在 com.some.pack.NY.stop(NY.java:100) 11-07 22:48:08.384: E/AndroidRuntime(5325): 在 com.some.pack。 onDestroy(NY.java:106) 11-07 22:48:08.384: E/AndroidRuntime(5325): 在 android.app.Activity.performDestroy(Activity.java:5302) 11-07 22:48:08.384: E/ AndroidRuntime(5325):在 android.app.Instrumentation.callActivityOnDestroy

(Instrumentation.java:1117) 11-07 22:48:08.384: E/AndroidRuntime(5325): 在 android.app.ActivityThread.performDestroyActivity

(ActivityThread.java:3614) 11-07 22:48:08.384: E/AndroidRuntime(5325): ... 11 更多 11-07 22:48:08.392: W/ActivityManager(408): 强制完成活动 com.some .pack/.MainActivity 11-07 22:48:09.079:W/ActivityManager(408):ActivityRecord{41985990 u0 的活动暂停超时

com.some.pack/.MainActivity} 11-07 22:48:09.095:W/Settings(4780):设置 bugreport_in_power_menu 已从 android.provider.Settings.Secure 移至

android.provider.Settings.Global。11-07 22:48:09.095:E/Cryptfs(124):未运行加密,中止 11-07 22:48:09.923:I/Process(5325):发送信号。PID: 5325 SIG: 9 11-07 22:48:09.946: W/InputDispatcher(408): channel '419e2ff0 com.some.pack/com.some.pack.MainActivity (server)' ~ Consumer

关闭输入通道或发生错误。events=0x9 11-07 22:48:09.946: E/InputDispatcher(408): channel '419e2ff0 com.some.pack/com.some.pack.MainActivity (server)' ~ Channel is

不可恢复地破碎,将被处理!

0 投票
1 回答
2308 浏览

node.js - Node.JS 无界并发/基于 TCP 的流背压

据我了解,Node 事件 IO 模型的后果之一是,一旦您连接了接收事件处理程序(或否则开始监听数据)。

如果接收方不能足够快地处理传入的数据,则可能会导致“无限并发”,即引擎盖下的节点继续尽可能快地从套接字读取数据,在事件循环上调度新的数据事件而不是阻塞套接字,直到进程最终耗尽内存并死亡。

接收方不能告诉节点放慢它的读取速度,否则会允许 TCP 的内置流量控制机制启动并向发送方指示它需要放慢速度。

首先,到目前为止我所描述的是否准确?有什么我错过的东西可以让节点避免这种情况吗?

Node Streams 备受吹捧的功能之一是自动处理背压。

AFAIK,(tcp 套接字的)可写流可以判断它是否需要减速的唯一方法是查看socket.bufferSize(指示写入套接字但尚未发送的数据量)。鉴于接收端的 Node 总是尽可能快地读取,这只能表明发送方和接收方之间的网络连接速度较慢,而不是接收方是否跟不上。

其次,Node Streams 自动背压能否在这种情况下以某种方式工作以处理无法跟上的接收器?

这个问题似乎也影响了通过 websockets 接收数据的浏览器,原因类似,websockets API 没有提供一种机制来告诉浏览器减慢它从套接字读取的速度。

对于 Node(和使用 websockets 的浏览器)来说,这个问题的唯一解决方案是在应用程序级别实现手动流控制机制,明确告诉发送进程放慢速度吗?

0 投票
0 回答
236 浏览

netty - 使用网络传输层的风暴拓扑中的流量控制

我创建了一个不可靠的小型拓扑结构,其中一个 spout 从包含经纬度坐标的文件中读取行,其中一个下游螺栓调用外部反向地理编码服务来确定国家/地区。因为这个特定的螺栓在一段时间后以非常慢的速度处理元组,所以整个拓扑停止(不产生输出)。

(1) 我想知道当螺栓无法处理元组的传入速率时会发生什么。据我了解,storm 是基于推送的,这意味着 spout 在循环中连续发出元组,并将它们存储在每个工作程序/执行程序的下游发送和接收缓冲区/队列中。当这些缓冲区/队列完全填满时会发生什么?spout 是否停止发出新的元组?由于从 0mq 到 netty 传输层的转换,这个实现是否发生了变化?

(2) 已经提到,在不可靠的拓扑结构中,在 Storm 中进行流控制的唯一方法是使用 acking 系统和 max spout 挂起参数在 spout 上发出带有 id 的元组,而无需在 ack/fail 方法中执行任何操作. 这是因为 0mq 传输层存在一些限制。现在storm > 0.9使用了netty传输层,还有其他方法可以在不可靠的拓扑中进行流量控制吗?

先感谢您

0 投票
1 回答
895 浏览

javascript - WebRTC DataChannel 流/控制/背压

RTCDataChannel API 不提供任何类型的流/控制或背压,这是否意味着发送方理论上可以使接收方的浏览器崩溃?在我看来,浏览器(Chrome、Firefox 等都在后台使用 SCTP)从 SCTP 连接中读取数据并安排运行 js-callback 来消耗数据包。如果事件队列跟不上发送者的速度,浏览器基本上会不断地读取数据包,同时将数据包存储在一个无限增长的缓冲区中。因此,当您连接两个浏览器时,发送方实际上总是可以压倒另一个,因为没有 TCP 接收窗口或类似的障碍。

这也适用于 websocket api。

我只是错过了一些东西还是这些 API 只是坏了?如果我是对的,那么在与未经身份验证的浏览器交谈时(例如在洪流场景中),这将是一个严重的安全问题。

0 投票
0 回答
267 浏览

node.js - 检查 Node.js 流中的当前背压水平

Node.js 中有没有办法检查 Stream 的背压状态?例如,将其与高水位线进行比较。

我的用例在单元测试中,我想确保转换流插件正在耗尽管道并且不会在输出流上建立背压。

0 投票
2 回答
581 浏览

scala - Play WebSockets 是否支持背压?

我需要在我的 Play 2.3 应用程序中添加一个 WebSocket 到 TCP 代理,但是虽然使用 Akka I/O 的传出 TCP 连接支持背压,但我看不到 WebSocket 的任何内容。基于演员的 API 显然不支持,但James Roper 说

Iteratee 通过设计来处理这个问题,在它返回的最后一个 future 被赎回之前,您不能将新元素提供给 iteratee,因为在此之前您没有对它的引用。

但是,我不明白他指的是什么。Iteratee.foreach,如示例中使用的那样,似乎太简单了。我在 iteratee API 中看到的唯一期货是用于完成计算结果。我应该Future[Unit]为每条消息完成一个还是什么?

0 投票
1 回答
385 浏览

node.js - 如何在 node.js 中实现正确处理背压的流?

我一生都无法弄清楚如何实现正确处理背压的。你不应该使用暂停和恢复吗?

我有这个实现,我正在尝试正常工作:

它正确发送输出,但没有正确产生背压。如何更改它以正确支持背压?