问题标签 [backpressure]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
rx-java - 如何在 Rxjava2 中获取有关背压的实际最新事件?Flowable.onBackpressureLatest() 没有按预期工作
当生产者产生事件的速度快于客户消费时。
我认为将Flowable与onBackpressureLatest()一起使用,我可以获得最新的事件发出。
但事实证明有一个大小为 128 的默认缓冲区。我得到的是以前缓冲的过时事件。
那么我怎样才能得到实际的最新事件呢?
这是示例代码:
我所期望的:
我得到了什么:
kotlin - Kotlin 协程:并发执行节流
想象一下,我们正在从消息队列中读取消息,并在接收时将它们推送到线程池中进行处理。线程数量有限,所以如果所有线程都忙,我们就会有自然的背压。
在 Kotlin 协程世界中如何解决这个问题?如果我们为每条传入消息创建一个协程,我们很快就会遇到内存不足错误(例如,如果每个任务都需要从数据库加载一些数据)和其他问题。
有没有解决这个问题的机制或模式?
redis - 如何高效地将数据从 flink 管道写入 redis
我正在 Apache flink sql api 中构建管道。管道执行简单的投影查询。但是,我需要在查询之前和查询之后编写一次元组(确切地说是每个元组中的一些元素)。事实证明,我用来写入 redis 的代码会严重降低性能。即 flink 以非常小的数据速率产生背压。我的代码有什么问题以及如何改进。请有任何建议。
当我停止写redis之前和之后的表现非常好。这是我的管道代码:
附加部分:我尝试了第二个实现,使用 Jedis 批量编写 toredis 的过程功能。但是我收到以下错误。org.apache.flink.runtime.client.JobExecutionException: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: 套接字未连接。我试图使批处理消息的数量更小,但一段时间后我仍然收到错误。
下面是流程函数的实现:
/** * 使用进程函数写入redis */
scala - Akka-streams 通过异步处理对广播产生背压
我正在努力理解 akka-stream 是否在 Source 上强制执行背压时,一个分支的广播在图中需要大量时间(异步)。
我尝试查看是否在源buffer
上batch
施加了任何背压,但看起来不像。我也尝试过冲洗System.out
,但它没有改变任何东西。
当我运行程序时,我希望在控制台中看到“Batched ...”,并且在某些时候它会暂时卡住,因为 f4 的速度不足以处理这些值。目前,这些都没有像预期的那样表现,因为数字是连续生成的,没有批处理。
编辑:我注意到一段时间后,批处理消息开始在控制台中打印出来。我仍然不知道为什么它不会尽快发生,因为第一个元素应该发生背压
java - 在单个调度程序上观察到多个 Flowable,事件的消耗顺序与发出的顺序不同
我正在使用来自不同外部源/订阅包装的事件来使用不同的 Flowable。来源无关紧要,因为我可以通过简单的循环重现该问题。我有 :
- 不同的FlowableEmitter(3个足以重现)
- 发射器的单线程(下面是主线程)
- 订阅者的单线程 (newSingleThreadExecutor)
这是一个简单的代码重现
我的问题是事件的使用顺序与它们发出的顺序不同。如果我删除了 observeOn(scheduler) 它工作正常,但我需要在不同的线程上发射和订阅。我还测试了不同的 BackpressureStrategy 并没有帮助。
任何线索都可以按顺序订阅/打印所有号码(1,2,3,4,5...14,15),而不是我下面的
如果有问题,我正在使用 rx-java 2.2.5 和 Java 8。
node.js - NodeJS WebSockets (ws) 模块是否实现背压?
我正在使用该ws
模块在 NodeJS 上实现 WebSockets 服务器。服务器应每分钟向所有客户端发送一次更新。我已经实现了这个,但我对它在客户端连接可能停止的情况下的功能有些担心。
我担心当与客户端的连接变为非活动状态时会发生什么,例如由于网络连接以不发送 TCP RST 或 FIN 的方式中断。
我有点惊讶于方法中没有使用关键字send()
调用该方法。该方法是否只是将所有要发送的数据排队?如果套接字缓冲区已满,会以一种导致其他客户端饥饿的方式阻塞,而不是被阻塞的客户端怎么办?await
async
send()
send()
如果send()
从不阻塞,如果数据排队,排队,排队......会发生什么?它可以使用不断增加的无限内存吗?
理想情况下,如果最后一次更新尚未完全发送,我想省略发送每分钟一次的更新。我可以通过ws
模块实现这一点吗?
scala - Akka 流 actor-conflation-ratelimit-actor 丢弃前几条消息(有时)
一个简单的合并组合(如下)有时会在 staartup 打印一条调试消息,说它由于零需求而丢弃消息。我希望合并阶段能够提供无限的需求,所以上述情况绝不应该是这样。我错过了什么?
reactive-programming - 如何为反应式网络库实现背压?
关于我的套接字反应库工作过程中的问题,有一个很长的背景故事。
socket 库主要基于一个名为Monix
(类似 ReactiveX)的响应式库。Monix 具有通过(扩展 Future)类型处理背压的最佳实践,Ack
该类型在处理当前消息时发出下一条消息。这是一个很好的机制来保护系统的大消息匆忙。
目前设计,每一个socket连接都是一个Observable(或Stream),Observable在解析TCP/IP网络字节流时会创建一个协议消息,然后将协议消息推送给Subscriber。
问题是 Monix 库只能为每个 Observable 做背压。如果您认为有数千个客户端连接,那么云会有这么多的 Observable 和背压是没有意义的。
那么,如何针对单个 Observable 之外的全局系统,为这样的响应式系统设计背压机制呢?
谢谢
scala - Monix 如何通过 flatMap 运算符使用背压?
Monix 使用 Ack 同步发出的消息,但如果我使用 groupBy 和 flatMap,内部 Observable 不会跟随source
.
请参阅此测试代码:
输出:
其中出现一些背压不匹配:
after: sleep 2 second for every message ...
backpressure give 3 of itemafter backpressure map - ...
就背压而言, 怎么可能sleep 2 second for every message ...
有一对一的关系?after backpressure map - ...
还有一个疑惑:为什么要记录sleep 2 second for every message
输出(0, 72), (0, 75), (0,78)
但这样的事情(0, 72), (1, 73), (2,74)
?
谢谢。
莫尼克斯版本:
"io.monix" %% "monix" % "3.0.0-RC1"
java - 在 Reactor 应用程序中测试对 Flux 的背压应用程序
我正在使用 Reactor 从 Kafka 主题中读取数据。每条消息的细化请求对 MongoDB 的查询,这比从 Kafka 主题读取消息要慢。因此,我对流应用了背压处理。
我正在使用 aConnectableFlux
来为生产者提供多个订阅者KafkaReceiver
。KafkaReceiver
本机不允许超过一个订阅者。
我需要测试我的代码是否正确地将背压应用于流。我怎么能做到这一点,使用一些集成测试?
谢谢大家。