问题标签 [reactive-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
5587 浏览

scala - Scala Akka Stream:如何通过 Seq

我正在尝试将一些阻塞调用包装在Future. 返回类型是Seq[User]where Useris a case class。以下内容无法与存在各种重载版本的投诉一起编译。有什么建议么?我尝试了几乎所有的变化都Source.apply没有任何运气。

0 投票
1 回答
2390 浏览

json - Akka HTTP:如何将 Json 格式响应解组为域对象

我正在尝试 Akka HTTP,并且我创建了一个服务,该服务在 HttpResponse 中返回域对象的 Json 数组。在客户端中,我想将其转换为域对象的源,以便后续流和接收器可以使用它。

参考 Json 支持部分: http ://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/http/common/json-support.html

我已经完成了定义隐式 RootJsonReader 等的必要工作,但我不知道如何使用 FromEntityUnmarshaller。

我的代码在这里: https ://github.com/charlesxucheng/akka-http-microservice

它基于 akka-http-microservice 激活器模板。Service2.scala 是我的服务器实现并且正在运行。AkkaHttpClient.scala 是客户端实现,它是不完整的。

要构建,请使用 Gradle,因为 build.sbt 不是最新的。

谢谢。

0 投票
1 回答
1342 浏览

scala - Akka/Scala:你能解释一下这个 Akka Streams Flow 中发生了什么吗?

我正在研究 Akka 流,并做了一个斐波那契发布者 - 订阅者示例,如下所示。但是,我还不太明白需求最初是如何产生的,以及它与订阅者的请求策略有什么关系。有人可以解释一下吗?

斐波那契出版社:

斐波那契订阅者:

斐波那契应用程序:

样品运行:问题:最初对 4 的需求是从哪里来的?

0 投票
0 回答
717 浏览

java - 弹簧集成与反应流

我正在做一个 ETL 项目。我已经使用弹簧集成很长时间了。数据源当前是文件或编年史,但它可能会更改为实时流,并且数量可能会增长。未来有可能转向大数据解决方案(hadoop、spark 等)。

基于此,我需要在 spring 集成和反应流之间进行比较?为什么有人会使用一个而不是另一个(或者我首先试图比较两者是错的)?您认为它们可以一起使用的场景(如果有的话)?

0 投票
2 回答
2984 浏览

scala - Akka-Stream 实现比单线程实现慢

2015 年 10 月 30 日更新


基于 Roland Kuhn Awnser:

Akka Streams 使用 Actor 之间的异步消息传递来实现流处理阶段。跨异步边界传递数据具有您在此处看到的开销:您的计算似乎只需要大约 160ns(源自单线程测量),而流式解决方案每个元素大约需要 1µs,这主要由消息传递决定。

另一个误解是说“流”意味着并行性:在您的代码中,所有计算都在单个 Actor(映射阶段)中顺序运行,因此与原始单线程解决方案相比,没有任何好处。

为了从 Akka Streams 提供的并行性中受益,您需要有多个处理阶段,每个处理阶段执行以下任务

每个元素 1µs,另请参阅文档。

我做了一些改变。我的代码现在看起来像:

我不确定我是否完全错了,但我使用 akka-streams 的实现仍然慢得多(现在甚至比以前更慢),但我发现:如果我增加工作,例如通过做一些除法,用 akka 实现-streams 变得更快。因此,如果我做对了(否则请纠正我),我的示例中似乎有太多开销。因此,如果代码必须做繁重的工作,您只能从 akka-streams 中受益吗?




我在 scala 和 akka-stream 中都比较新。我写了一个小测试项目,它会创建一些事件,直到计数器达到特定数字。对于每个事件,正在计算事件的一个字段的阶乘。我实施了两次。一次使用akka-stream,一次没有akka-stream(单线程)并比较运行时。

我没想到:当我创建一个事件时,两个程序的运行时间几乎相同。但是如果我创建 70,000,000 个事件,那么没有 akka-streams 的实现会快得多。这是我的结果(以下数据基于 24 次测量):


  • 没有 akka-streams 的单个事件403 (+- 2)ms
  • 带有 akka-streams 的单个事件444 (+-13)ms


  • 没有 akka-streams 的 70Mio 事件11778 (+-70)ms

  • 70Mio 事件与 akka-steams : 75424(+-2959)ms

所以我的问题是:发生了什么事?为什么我的 akka-stream 实现速度较慢?

这是我的代码:

使用 Akka 实现

没有 Akka 的实现

对象单线程 {

共享功能

实施活动

0 投票
2 回答
1946 浏览

unit-testing - Project Reactor:等待广播员完成

有一个广播器,它接受字符串并将它们附加到 StringBuilder。

我想测试一下。

我不得不Thread#sleep等待,而广播员完成字符串的处理。我想删除sleep.

我尝试使用Control#debug()不成功。

0 投票
1 回答
135 浏览

javascript - 如何仅在流更改时过滤流?

响应式编程的新手。我有一个流,一个滚动流,绑定到一个 domNode,然后还有一些其他流通过过滤器订阅:

这输出如下:

你可以在这里看到:http: //jsbin.com/zedazapato/edit ?js,console,output

true我希望在数百个更改(从到)时输出这些新流false,而不是重复输出:

我尝试过使用Observable.distinctUntilChanged,但它的行为似乎不像我预期的那样(它似乎输出相同的东西):http: //jsbin.com/gibefagiri/1/edit ?js,console,output

我哪里错了?

0 投票
2 回答
1202 浏览

scala - Scala Slick:永无止境的流

使用 Slick,您可以执行以下操作以从表中生成结果流:

这将打印events表中的所有事件并在最后一行之后终止。

假设可以以某种方式通知您何时将新行输入到events表中,是否可以编写一个在插入事件时连续输出事件的流?一种tail -f用于数据库表。

我认为 Slick 本身不会支持这一点,但我认为应该可以使用 Akka 流来提供帮助。因此,如果您可以从 Slick Source 获取某些内容直到它为空,然后等待事件指示表中的更多数据,然后流式传输新数据。可能通过使用ActorPublisher来绑定这个逻辑?

只是想知道是否有人在这方面有任何经验或任何建议?

0 投票
1 回答
6575 浏览

java - 如何为多个(10k - 100k)请求正确调用 Akka HTTP 客户端?

我正在尝试使用 Akka HTTP 2.0-M2 编写一个用于批量数据上传的工具。但我面临akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error.

我试图找出一个问题,这里的示例代码也失败了:

它失败了:

我想那是因为我试图创建很多 Futures 并同时执行它们。但是 Akka 不应该启用背压吗?我想我用错了。我尝试了 superPool 方法,但没有任何改变,因为据我所知,Http.singleRequest里面有相同的池。我也尝试重用 Http 实例而不是Http.get()在循环中调用,但它也没有帮助。

触发一批请求的正确方法是什么?我计划批量执行 10 000 - 100 000 个请求。

0 投票
1 回答
5067 浏览

java - 反应式和反应式流之间有什么区别?

我试图了解 Reactive 和 ReactiveStreams 之间的区别,特别是在 RxJava 的上下文中?

我能想到的最多的是,Reactive Streams 在规范中有一些背压的概念,但在 RxJava/Reactive 中已经存在request(n)接口。

不介意 ELI5 的答案。