问题标签 [backpressure]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
150 浏览

rx-java - 如何在 Rxjava2 中获取有关背压的实际最新事件?Flowable.onBackpressureLatest() 没有按预期工作

当生产者产生事件的速度快于客户消费时。

我认为将FlowableonBackpressureLatest()一起使用,我可以获得最新的事件发出。

但事实证明有一个大小为 128 的默认缓冲区。我得到的是以前缓冲的过时事件。

那么我怎样才能得到实际的最新事件呢?

这是示例代码:

我所期望的:

我得到了什么:

0 投票
1 回答
1725 浏览

kotlin - Kotlin 协程:并发执行节流

想象一下,我们正在从消息队列中读取消息,并在接收时将它们推送到线程池中进行处理。线程数量有限,所以如果所有线程都忙,我们就会有自然的背压。

在 Kotlin 协程世界中如何解决这个问题?如果我们为每条传入消息创建一个协程,我们很快就会遇到内存不足错误(例如,如果每个任务都需要从数据库加载一些数据)和其他问题。

有没有解决这个问题的机制或模式?

0 投票
2 回答
1443 浏览

redis - 如何高效地将数据从 flink 管道写入 redis

我正在 Apache flink sql api 中构建管道。管道执行简单的投影查询。但是,我需要在查询之前和查询之后编写一次元组(确切地说是每个元组中的一些元素)。事实证明,我用来写入 redis 的代码会严重降低性能。即 flink 以非常小的数据速率产生背压。我的代码有什么问题以及如何改进。请有任何建议。

当我停止写redis之前和之后的表现非常好。这是我的管道代码:

附加部分:我尝试了第二个实现,使用 Jedis 批量编写 toredis 的过程功能。但是我收到以下错误。org.apache.flink.runtime.client.JobExecutionException: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: 套接字未连接。我试图使批处理消息的数量更小,但一段时间后我仍然收到错误。

下面是流程函数的实现:

/** * 使用进程函数写入redis */

0 投票
1 回答
784 浏览

scala - Akka-streams 通过异步处理对广播产生背压

我正在努力理解 akka-stream 是否在 Source 上强制执行背压时,一个分支的广播在图中需要大量时间(异步)。

我尝试查看是否在源bufferbatch施加了任何背压,但看起来不像。我也尝试过冲洗System.out,但它没有改变任何东西。

当我运行程序时,我希望在控制台中看到“Batched ...”,并且在某些时候它会暂时卡住,因为 f4 的速度不足以处理这些值。目前,这些都没有像预期的那样表现,因为数字是连续生成的,没有批处理。

编辑:我注意到一段时间后,批处理消息开始在控制台中打印出来。我仍然不知道为什么它不会尽快发生,因为第一个元素应该发生背压

0 投票
1 回答
55 浏览

java - 在单个调度程序上观察到多个 Flowable,事件的消耗顺序与发出的顺序不同

我正在使用来自不同外部源/订阅包装的事件来使用不同的 Flowable。来源无关紧要,因为我可以通过简单的循环重现该问题。我有 :

  • 不同的FlowableEmitter(3个足以重现)
  • 发射器的单线程(下面是主线程)
  • 订阅者的单线程 (newSingleThreadExecutor)

这是一个简单的代码重现

我的问题是事件的使用顺序与它们发出的顺序不同。如果我删除了 observeOn(scheduler) 它工作正常,但我需要在不同的线程上发射和订阅。我还测试了不同的 BackpressureStrategy 并没有帮助。
任何线索都可以按顺序订阅/打印所有号码(1,2,3,4,5...14,15),而不是我下面的

如果有问题,我正在使用 rx-java 2.2.5 和 Java 8。

0 投票
1 回答
1063 浏览

node.js - NodeJS WebSockets (ws) 模块是否实现背压?

我正在使用该ws模块在 NodeJS 上实现 WebSockets 服务器。服务器应每分钟向所有客户端发送一次更新。我已经实现了这个,但我对它在客户端连接可能停止的情况下的功能有些担心。

我担心当与客户端的连接变为非活动状态时会发生什么,例如由于网络连接以不发送 TCP RST 或 FIN 的方式中断。

我有点惊讶于方法中没有使用关键字send()调用该方法。该方法是否只是将所有要发送的数据排队?如果套接字缓冲区已满,会以一种导致其他客户端饥饿的方式阻塞,而不是被阻塞的客户端怎么办?awaitasyncsend()send()

如果send()从不阻塞,如果数据排队,排队,排队......会发生什么?它可以使用不断增加的无限内存吗?

理想情况下,如果最后一次更新尚未完全发送,我想省略发送每分钟一次的更新。我可以通过ws模块实现这一点吗?

0 投票
1 回答
72 浏览

scala - Akka 流 actor-conflation-ratelimit-actor 丢弃前几条消息(有时)

一个简单的合并组合(如下)有时会在 staartup 打印一条调试消息,说它由于零需求而丢弃消息。我希望合并阶段能够提供无限的需求,所以上述情况绝不应该是这样。我错过了什么?

0 投票
0 回答
149 浏览

reactive-programming - 如何为反应式网络库实现背压?

关于我的套接字反应库工作过程中的问题,有一个很长的背景故事。

socket 库主要基于一个名为Monix(类似 ReactiveX)的响应式库。Monix 具有通过(扩展 Future)类型处理背压的最佳实践,Ack该类型在处理当前消息时发出下一条消息。这是一个很好的机制来保护系统的大消息匆忙。

目前设计,每一个socket连接都是一个Observable(或Stream),Observable在解析TCP/IP网络字节流时会创建一个协议消息,然后将协议消息推送给Subscriber。

问题是 Monix 库只能为每个 Observable 做背压。如果您认为有数千个客户端连接,那么云会有这么多的 Observable 和背压是没有意义的。

那么,如何针对单个 Observable 之外的全局系统,为这样的响应式系统设计背压机制呢?

谢谢

0 投票
1 回答
228 浏览

scala - Monix 如何通过 flatMap 运算符使用背压?

Monix 使用 Ack 同步发出的消息,但如果我使用 groupBy 和 flatMap,内部 Observable 不会跟随source.

请参阅此测试代码:

输出:

其中出现一些背压不匹配:
after: sleep 2 second for every message ...backpressure give 3 of itemafter backpressure map - ...

就背压而言, 怎么可能sleep 2 second for every message ...有一对一的关系?after backpressure map - ...

还有一个疑惑:为什么要记录sleep 2 second for every message 输出(0, 72), (0, 75), (0,78)但这样的事情(0, 72), (1, 73), (2,74)

谢谢。

莫尼克斯版本: "io.monix" %% "monix" % "3.0.0-RC1"

0 投票
1 回答
499 浏览

java - 在 Reactor 应用程序中测试对 Flux 的背压应用程序

我正在使用 Reactor 从 Kafka 主题中读取数据。每条消息的细化请求对 MongoDB 的查询,这比从 Kafka 主题读取消息要慢。因此,我对流应用了背压处理。

我正在使用 aConnectableFlux来为生产者提供多个订阅者KafkaReceiverKafkaReceiver本机不允许超过一个订阅者。

我需要测试我的代码是否正确地将背压应用于流。我怎么能做到这一点,使用一些集成测试

谢谢大家。