问题标签 [akka-stream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Akka Stream:无法处理来自测试的文件,在主类中工作正常
这就是我的代码的样子
当我运行它时,我得到的输出为
现在我为它写测试
和
当我运行这个测试时,我看到
并且没有生成输出,我错过了什么?
scala - 如何调试 Akka-Stream 管道?
我正在尝试为日志行看起来像的日志文件构建处理管道
对于这个特定的日志文件,我有大约193705条要处理的日志行。
我创建了一个最初看起来像的流程图
但后来我意识到我在 Sink 中变得越来越少
所以我把我的图表做成线性的,以确保所有的线都通过管道。我当前的代码看起来像
和
当我再次运行我的程序时,它再次生成接近上述数字的行(不完全相同)
我很困惑,想在这里得到一些帮助
- 流不会失败或抛出异常,但仍会生成更少的输出行,我该如何调试?
- 是1500
maximumFrameLength
,但是日志行可以大于(更多字符)。这会是个问题吗?那我该如何解决这种情况呢? - 我如何确认?
Materializer
根据我的代码,当我打电话时我没有回来run()
,所以我无法关闭ActorSystem
,我在那里错过了什么?
更新
我刚刚发现它停止了有2564 chars(bytes)
和
所以它停止line 15071
了2564 bytes
。
但为什么它不抛出异常?我该如何处理?
java - 应该在哪里为 ActorPublisher 指定 supervisorStrategy?
我创建了ActorPublisher
一个源流。我SupervisionStrategy
为我的流的 Materializer 设置:
但它从不使用 created 策略,何时ActorPublisher
抛出异常。我也尝试supervisorStrategy()
在MyActorPublisher
. 但我知道它只用于儿童演员,它不起作用。
python - Scala读取连续的http流
如何连接并读取 scala 中的连续(分块)http 流?例如,如果我有这个用 python/bottle 编写的简单服务:
我打算用它akka-stream
来处理数据,我只需要一种方法来检索它。
scala - akka-http 发送连续的分块 http 响应(流)
我有这个带有akka-http
客户端和服务器的粗略测试示例。
服务器.scala:
客户端.scala:
目前Server
只回复一个“测试”。
如何更改输入HttpResponse
以Server
每 1 秒在无限循环中将“测试”作为分块(流)发送?
java - 当 actorSubscriber 死亡时,流会发生什么?
在 Akka Streams (java api) 中,当 ActorSubscriber 死亡时上游会发生什么?直播会被取消吗?我使用 akka 2.4 和 akka 流实验 1.0 和 jdk 8
代码如下所示:
scala - akka-http 分块响应连接
我akka-http
用来向发送回分块响应的 http 服务发出请求。这就是相关代码的样子:
命令行中生成的输出如下所示:
数据的逻辑部分 - 在这种情况下是一个 json 以行尾符号结束\r\n
,但问题是,json 并不总是适合单个 http 响应块,如上面示例中清晰可见的那样。
我的问题是 - 我如何将传入的分块数据连接到完整的 json 中,以便生成的容器类型仍然是Source[Out,M1]
or Flow[In,Out,M2]
?我想遵循akka-stream
.
更新:值得一提的是,响应是无止境的,聚合必须实时完成
scala - Akka-Streams 收集数据(Source -> Flow -> Flow (collect) -> Sink)
我是 Scala 和 Akka 的新手。我有一个简单的 RunnableFlow:
现在我想要这样的东西:
但是 Flow2 应该等到 Flow1 中的 100 个元素可用,然后将这 100 个元素转换为一个新元素(需要 Flow1 中的所有 100 个元素)并将这个新元素提供给 Sink。
我做了一些研究,发现了明确的用户定义缓冲区,但我不明白如何从 flow2 中的 flow1 访问所有 100 个元素并对其进行一些转换。有人可以解释一下吗?或者更好地发布一个简单的小例子?或两者?
scala - 使用 Spark DStream 作为 Akka 流的源的惯用方式
我正在构建一个 REST API,它在 Spark 集群中开始一些计算并以结果的分块流进行响应。给定带有计算结果的 Spark 流,我可以使用
从 Spark 发送数据。我正在使用 akka-http 发送分块的 HTTP 响应:
为简单起见,我试图先让纯文本工作,稍后再添加 JSON 编组。
但是使用 Spark DStream 作为 Akka 流的源的惯用方式是什么?我想我应该能够通过套接字来做到这一点,但由于 Spark 驱动程序和 REST 端点位于同一个 JVM 上,为此打开一个套接字似乎有点矫枉过正。
scala - Akka-Stream 实现比单线程实现慢
2015 年 10 月 30 日更新
基于 Roland Kuhn Awnser:
Akka Streams 使用 Actor 之间的异步消息传递来实现流处理阶段。跨异步边界传递数据具有您在此处看到的开销:您的计算似乎只需要大约 160ns(源自单线程测量),而流式解决方案每个元素大约需要 1µs,这主要由消息传递决定。
另一个误解是说“流”意味着并行性:在您的代码中,所有计算都在单个 Actor(映射阶段)中顺序运行,因此与原始单线程解决方案相比,没有任何好处。
为了从 Akka Streams 提供的并行性中受益,您需要有多个处理阶段,每个处理阶段执行以下任务
每个元素 1µs,另请参阅文档。
我做了一些改变。我的代码现在看起来像:
我不确定我是否完全错了,但我使用 akka-streams 的实现仍然慢得多(现在甚至比以前更慢),但我发现:如果我增加工作,例如通过做一些除法,用 akka 实现-streams 变得更快。因此,如果我做对了(否则请纠正我),我的示例中似乎有太多开销。因此,如果代码必须做繁重的工作,您只能从 akka-streams 中受益吗?
我在 scala 和 akka-stream 中都比较新。我写了一个小测试项目,它会创建一些事件,直到计数器达到特定数字。对于每个事件,正在计算事件的一个字段的阶乘。我实施了两次。一次使用akka-stream,一次没有akka-stream(单线程)并比较运行时。
我没想到:当我创建一个事件时,两个程序的运行时间几乎相同。但是如果我创建 70,000,000 个事件,那么没有 akka-streams 的实现会快得多。这是我的结果(以下数据基于 24 次测量):
- 没有 akka-streams 的单个事件: 403 (+- 2)ms
带有 akka-streams 的单个事件:444 (+-13)ms
没有 akka-streams 的 70Mio 事件:11778 (+-70)ms
- 70Mio 事件与 akka-steams : 75424(+-2959)ms
所以我的问题是:发生了什么事?为什么我的 akka-stream 实现速度较慢?
这是我的代码:
使用 Akka 实现
没有 Akka 的实现
对象单线程 {
共享功能
实施活动