问题标签 [reactive-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Scala Akka Stream:如何通过 Seq
我正在尝试将一些阻塞调用包装在Future
. 返回类型是Seq[User]
where User
is a case class
。以下内容无法与存在各种重载版本的投诉一起编译。有什么建议么?我尝试了几乎所有的变化都Source.apply
没有任何运气。
json - Akka HTTP:如何将 Json 格式响应解组为域对象
我正在尝试 Akka HTTP,并且我创建了一个服务,该服务在 HttpResponse 中返回域对象的 Json 数组。在客户端中,我想将其转换为域对象的源,以便后续流和接收器可以使用它。
参考 Json 支持部分: http ://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/http/common/json-support.html
我已经完成了定义隐式 RootJsonReader 等的必要工作,但我不知道如何使用 FromEntityUnmarshaller。
我的代码在这里: https ://github.com/charlesxucheng/akka-http-microservice
它基于 akka-http-microservice 激活器模板。Service2.scala 是我的服务器实现并且正在运行。AkkaHttpClient.scala 是客户端实现,它是不完整的。
要构建,请使用 Gradle,因为 build.sbt 不是最新的。
谢谢。
scala - Akka/Scala:你能解释一下这个 Akka Streams Flow 中发生了什么吗?
我正在研究 Akka 流,并做了一个斐波那契发布者 - 订阅者示例,如下所示。但是,我还不太明白需求最初是如何产生的,以及它与订阅者的请求策略有什么关系。有人可以解释一下吗?
斐波那契出版社:
斐波那契订阅者:
斐波那契应用程序:
样品运行:问题:最初对 4 的需求是从哪里来的?
java - 弹簧集成与反应流
我正在做一个 ETL 项目。我已经使用弹簧集成很长时间了。数据源当前是文件或编年史,但它可能会更改为实时流,并且数量可能会增长。未来有可能转向大数据解决方案(hadoop、spark 等)。
基于此,我需要在 spring 集成和反应流之间进行比较?为什么有人会使用一个而不是另一个(或者我首先试图比较两者是错的)?您认为它们可以一起使用的场景(如果有的话)?
scala - Akka-Stream 实现比单线程实现慢
2015 年 10 月 30 日更新
基于 Roland Kuhn Awnser:
Akka Streams 使用 Actor 之间的异步消息传递来实现流处理阶段。跨异步边界传递数据具有您在此处看到的开销:您的计算似乎只需要大约 160ns(源自单线程测量),而流式解决方案每个元素大约需要 1µs,这主要由消息传递决定。
另一个误解是说“流”意味着并行性:在您的代码中,所有计算都在单个 Actor(映射阶段)中顺序运行,因此与原始单线程解决方案相比,没有任何好处。
为了从 Akka Streams 提供的并行性中受益,您需要有多个处理阶段,每个处理阶段执行以下任务
每个元素 1µs,另请参阅文档。
我做了一些改变。我的代码现在看起来像:
我不确定我是否完全错了,但我使用 akka-streams 的实现仍然慢得多(现在甚至比以前更慢),但我发现:如果我增加工作,例如通过做一些除法,用 akka 实现-streams 变得更快。因此,如果我做对了(否则请纠正我),我的示例中似乎有太多开销。因此,如果代码必须做繁重的工作,您只能从 akka-streams 中受益吗?
我在 scala 和 akka-stream 中都比较新。我写了一个小测试项目,它会创建一些事件,直到计数器达到特定数字。对于每个事件,正在计算事件的一个字段的阶乘。我实施了两次。一次使用akka-stream,一次没有akka-stream(单线程)并比较运行时。
我没想到:当我创建一个事件时,两个程序的运行时间几乎相同。但是如果我创建 70,000,000 个事件,那么没有 akka-streams 的实现会快得多。这是我的结果(以下数据基于 24 次测量):
- 没有 akka-streams 的单个事件: 403 (+- 2)ms
带有 akka-streams 的单个事件:444 (+-13)ms
没有 akka-streams 的 70Mio 事件:11778 (+-70)ms
- 70Mio 事件与 akka-steams : 75424(+-2959)ms
所以我的问题是:发生了什么事?为什么我的 akka-stream 实现速度较慢?
这是我的代码:
使用 Akka 实现
没有 Akka 的实现
对象单线程 {
共享功能
实施活动
unit-testing - Project Reactor:等待广播员完成
有一个广播器,它接受字符串并将它们附加到 StringBuilder。
我想测试一下。
我不得不Thread#sleep
等待,而广播员完成字符串的处理。我想删除sleep
.
我尝试使用Control#debug()
不成功。
javascript - 如何仅在流更改时过滤流?
响应式编程的新手。我有一个流,一个滚动流,绑定到一个 domNode,然后还有一些其他流通过过滤器订阅:
这输出如下:
你可以在这里看到:http: //jsbin.com/zedazapato/edit ?js,console,output
true
我希望在数百个更改(从到)时输出这些新流false
,而不是重复输出:
我尝试过使用Observable.distinctUntilChanged
,但它的行为似乎不像我预期的那样(它似乎输出相同的东西):http: //jsbin.com/gibefagiri/1/edit ?js,console,output
我哪里错了?
scala - Scala Slick:永无止境的流
使用 Slick,您可以执行以下操作以从表中生成结果流:
这将打印events
表中的所有事件并在最后一行之后终止。
假设可以以某种方式通知您何时将新行输入到events
表中,是否可以编写一个在插入事件时连续输出事件的流?一种tail -f
用于数据库表。
我认为 Slick 本身不会支持这一点,但我认为应该可以使用 Akka 流来提供帮助。因此,如果您可以从 Slick Source 获取某些内容直到它为空,然后等待事件指示表中的更多数据,然后流式传输新数据。可能通过使用ActorPublisher
来绑定这个逻辑?
只是想知道是否有人在这方面有任何经验或任何建议?
java - 如何为多个(10k - 100k)请求正确调用 Akka HTTP 客户端?
我正在尝试使用 Akka HTTP 2.0-M2 编写一个用于批量数据上传的工具。但我面临akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error.
我试图找出一个问题,这里的示例代码也失败了:
它失败了:
我想那是因为我试图创建很多 Futures 并同时执行它们。但是 Akka 不应该启用背压吗?我想我用错了。我尝试了 superPool 方法,但没有任何改变,因为据我所知,Http.singleRequest
里面有相同的池。我也尝试重用 Http 实例而不是Http.get()
在循环中调用,但它也没有帮助。
触发一批请求的正确方法是什么?我计划批量执行 10 000 - 100 000 个请求。
java - 反应式和反应式流之间有什么区别?
我试图了解 Reactive 和 ReactiveStreams 之间的区别,特别是在 RxJava 的上下文中?
我能想到的最多的是,Reactive Streams 在规范中有一些背压的概念,但在 RxJava/Reactive 中已经存在request(n)
接口。
不介意 ELI5 的答案。