问题标签 [reactive-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
2542 浏览

mysql - 如何在 Slick 中使用反应流来插入数据

Slick 的文档中,使用 Reactive Streams 的示例仅用于读取数据作为 DatabasePublisher 的一种方式。但是,当您想根据插入率将数据库用作 Sink 和 backpreasure 时会发生什么?

我寻找了等效的DatabaseSubscriber,但它不存在。所以问题是,如果我有来源,请说:

val source = Source(0 to 100)

如何使用 Slick 创建一个 Sink,将这些值写入具有架构的表中:

create table NumberTable (value INT)

0 投票
0 回答
148 浏览

scala - Akka Streams 使用 twitter4j-stream 的 NPE 失败

我正在试验 Akka 流和 twitter4j-stream。拥有一系列推文它可以正常工作,直到它失败的那一刻NPE,这对我来说没有意义。

最初我正在创建一个client

最后,我想得到具有以下签名的东西:val stream: Source[TwitterEvent, ShutdownHandler]所以我正在创建一个 ReactiveStreams Publisher,它将Source使用来自客户端的使用:

所以.filter()引导这个东西,我收到了一些推文,然后它失败了:

如果这很重要,我的TwitterListener

0 投票
2 回答
1652 浏览

mysql - 在 Reactive Streams Specification 1.0 发布后,jdbc 规范是否也会变得响应式?

我正在学习和使用带有akka流的反应流编程,我试图为async-jdbc-driver或reactive-jdbc-driver找到任何库2年,我发现slick 3.0或rxjava-jdbc-driver提供异步jdbc api,但我知道 slick 正在 JDBC api 之上构建令人惊叹的 api,它是阻塞的(如果我错了,请纠正我),所以我猜从系统的角度来看,它可能不是从上到下的 100% 反应式系统。

另一个惊人的事件是去年发布了“Reactive Streams Specification V1.0”,所以我的问题是:

  1. 该事件是否会触发 JDBC 专家组来设计异步 JDBC API 支持?
  2. 那么Database Provider Organizations,比如MySQL的供应商Oracle,有没有计划实现相应的驱动呢?
  3. 如果这是没有希望的,任何方向或替换或任何我可以变成的东西或 JDBC 层都不必是反应性的,横向扩展的 mysql 服务器就足够了吗?
0 投票
1 回答
2127 浏览

java - 集成测试 Spring SseEmitters

我一直在寻找有关如何最好地测试返回 SseEmitters 的 Spring MVC 控制器方法的提示。我提出的很短,但是有一个试错解决方案,可以针对异步线程行为进行测试。下面是示例代码,只是为了演示概念,可能有一两个错字:

控制器类:

测试类:

如评论中所述,调用 asyncResult() 以确保发布者完成其工作并在测试完成之前发送两个响应。没有它,内容检查会因为内容中只有一个响应而失败。但是没有实际结果要检查,因此 asyncResult 为空。

我的具体问题是是否有更好、更精确的方法来强制测试等待异步进程完成,而不是这里的等待不存在的 asyncResult 的 klugie 方法。我更广泛的问题是,与这些异步函数相比,是否有其他库或 Spring 方法更适合于此。谢谢!

0 投票
1 回答
719 浏览

scala - 从 EventStream 中创建源

我正在使用 PlayFramework 2.5.3 并想akka.stream.scaladsl.Source从一个akka.event.EventStream(事件流是演员系统的一部分)创建一个。事件流会产生某种类型的事件,因此我需要订阅该某种类型的事件并使用play.api.mvc.Results.chunked. Source有没有什么简单的方法可以使用 Akka Streams 2.4.5创建这样的?

0 投票
1 回答
195 浏览

java - cyclops-react:ReactiveSeq 上没有批处理功能?

使用 cyclops-react 1.0.0-RC3,我尝试使用批处理重新创建 cyclops-react流用户指南中的示例。我发现其中缺少一些方法ReactiveSeq,包括batchBySizewindowByTime

我确实找到了这些方法StreamUtils并且它们按预期工作,但看起来不像用户指南中的示例那么光滑......

从用户指南...

我能得到什么工作......

testBatchingSlidingWindowing您可以在方法测试类StreamsTest.java中的工作 JUnit 中查看我的代码

我应该期望找到batchBySizeand windowByTimeonReactiveSeq还是使用StreamUtils适当的方式?

0 投票
1 回答
640 浏览

streaming - 带有默认调度程序的反应式卡夫卡?

我正在使用reactive-kafka连接器使用 Kafka 和 Akka Streams 的项目。我们发现 reactive-kafka 使用它自己的调度程序(akka.kafka.default-dispatcher),但如果我们使用默认的 akka 调度程序,一切都会更快(reactive-kafka 调度程序 ~300 消息/秒,默认调度程序~1300 条消息/秒)

我想知道使用默认调度程序是否安全。

提前致谢。

0 投票
0 回答
498 浏览

scala - Akka 流忽略 ActorSubscriber 上的 onComplete 事件

我有一个用例,我有固定数量的元素从源流入。这些元素得到处理,然后有一个 Sink,在我们的例子中它是一个 ActorSubscriber。发生的情况是 Source 在发送所有元素后发出完成事件的信号。此事件传播到接收器,接收器触发 onComplete 事件并关闭流工作流。所有这些都发生在元素仍在中间步骤中处理的时候。

一旦中间步骤返回处理后的数据时处理完成,它就找不到订阅者,因为它已经由于触发了 onComplete 事件而关闭。有没有办法我们可以丢弃演员订阅者上的 onComplete 事件并使其始终保持打开状态。

代码片段:

演员订阅者:

评分完成处理工作。正在发生的是具有 3 个元素的源将完整事件发送到接收器。actorSubscriber 的 onComplete 事件在它从源接收到完成事件后被触发。在处理完评分引擎将处理后的结果发回给 actorSubscriber,但由于它不再活跃,我们会收到 deadLetter 消息。

0 投票
2 回答
121 浏览

scala - Akka Reactive Streams 总是落后一条消息

出于某种原因,我的 Akka 流总是在“发出”(?)第一条消息之前等待第二条消息。

这是一些演示我的问题的示例代码。

产生输出:

我想要的是:

0 投票
0 回答
127 浏览

playframework - 玩!Java:通过Websockets从流中推送消息

我们想使用 websocket 将通过 Kafka 流接收到的消息推送到客户端。

我们最初的想法是使用 Play!Java 框架用于提供 HTTP 接口,Akka 用于反应式流处理并通过 Play 的 Iteratee 推送消息,但 Iteratee api 似乎不适用于 Play!爪哇。有解决办法吗?