问题标签 [akka-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
70 浏览

scala - Akka Stream:无法处理来自测试的文件,在主类中工作正常

这就是我的代码的样子

当我运行它时,我得到的输出为

现在我为它写测试

当我运行这个测试时,我看到

并且没有生成输出,我错过了什么?

0 投票
0 回答
1306 浏览

scala - 如何调试 Akka-Stream 管道?

我正在尝试为日志行看起来像的日志文件构建处理管道

对于这个特定的日志文件,我有大约193705条要处理的日志行。

我创建了一个最初看起来像的流程图

但后来我意识到我在 Sink 中变得越来越少

所以我把我的图表做成线性的,以确保所有的线都通过管道。我当前的代码看起来像

当我再次运行我的程序时,它再次生成接近上述数字的行(不完全相同)

我很困惑,想在这里得到一些帮助

  1. 流不会失败或抛出异常,但仍会生成更少的输出行,我该如何调试?
  2. 1500maximumFrameLength,但是日志行可以大于(更多字符)。这会是个问题吗?那我该如何解决这种情况呢?
  3. 我如何确认?Materializer根据我的代码,当我打电话时我没有回来 run(),所以我无法关闭 ActorSystem,我在那里错过了什么?

更新

我刚刚发现它停止了有2564 chars(bytes)

所以它停止line 150712564 bytes

但为什么它不抛出异常?我该如何处理?

0 投票
0 回答
202 浏览

java - 应该在哪里为 ActorPublisher 指定 supervisorStrategy?

我创建了ActorPublisher一个源流。我SupervisionStrategy为我的流的 Materializer 设置:

但它从不使用 created 策略,何时ActorPublisher抛出异常。我也尝试supervisorStrategy()MyActorPublisher. 但我知道它只用于儿童演员,它不起作用。

0 投票
1 回答
833 浏览

python - Scala读取连续的http流

如何连接并读取 scala 中的连续(分块)http 流?例如,如果我有这个用 python/bottle 编写的简单服务:

我打算用它akka-stream来处理数据,我只需要一种方法来检索它。

0 投票
1 回答
2979 浏览

scala - akka-http 发送连续的分块 http 响应(流)

我有这个带有akka-http客户端和服务器的粗略测试示例。

服务器.scala:

客户端.scala:

目前Server只回复一个“测试”。

如何更改输入HttpResponseServer每 1 秒在无限循环中将“测试”作为分块(流)发送?

0 投票
0 回答
61 浏览

java - 当 actorSubscriber 死亡时,流会发生什么?

在 Akka Streams (java api) 中,当 ActorSubscriber 死亡时上游会发生什么?直播会被取消吗?我使用 akka 2.4 和 akka 流实验 1.0 和 jdk 8

代码如下所示:

0 投票
3 回答
3326 浏览

scala - akka-http 分块响应连接

akka-http用来向发送回分块响应的 http 服务发出请求。这就是相关代码的样子:

命令行中生成的输出如下所示:

数据的逻辑部分 - 在这种情况下是一个 json 以行尾符号结束\r\n,但问题是,json 并不总是适合单个 http 响应块,如上面示例中清晰可见的那样。

我的问题是 - 我如何将传入的分块数据连接到完整的 json 中,以便生成的容器类型仍然是Source[Out,M1]or Flow[In,Out,M2]?我想遵循akka-stream.

更新:值得一提的是,响应是无止境的,聚合必须实时完成

0 投票
1 回答
3665 浏览

scala - Akka-Streams 收集数据(Source -> Flow -> Flow (collect) -> Sink)

我是 Scala 和 Akka 的新手。我有一个简单的 RunnableFlow:

现在我想要这样的东西:

但是 Flow2 应该等到 Flow1 中的 100 个元素可用,然后将这 100 个元素转换为一个新元素(需要 Flow1 中的所有 100 个元素)并将这个新元素提供给 Sink。

我做了一些研究,发现了明确的用户定义缓冲区,但我不明白如何从 flow2 中的 flow1 访问所有 100 个元素并对其进行一些转换。有人可以解释一下吗?或者更好地发布一个简单的小例子?或两者?

0 投票
2 回答
1886 浏览

scala - 使用 Spark DStream 作为 Akka 流的源的惯用方式

我正在构建一个 REST API,它在 Spark 集群中开始一些计算并以结果的分块流进行响应。给定带有计算结果的 Spark 流,我可以使用

从 Spark 发送数据。我正在使用 akka-http 发送分块的 HTTP 响应:

为简单起见,我试图先让纯文本工作,稍后再添加 JSON 编组。

但是使用 Spark DStream 作为 Akka 流的源的惯用方式是什么?我想我应该能够通过套接字来做到这一点,但由于 Spark 驱动程序和 REST 端点位于同一个 JVM 上,为此打开一个套接字似乎有点矫枉过正。

0 投票
2 回答
2984 浏览

scala - Akka-Stream 实现比单线程实现慢

2015 年 10 月 30 日更新


基于 Roland Kuhn Awnser:

Akka Streams 使用 Actor 之间的异步消息传递来实现流处理阶段。跨异步边界传递数据具有您在此处看到的开销:您的计算似乎只需要大约 160ns(源自单线程测量),而流式解决方案每个元素大约需要 1µs,这主要由消息传递决定。

另一个误解是说“流”意味着并行性:在您的代码中,所有计算都在单个 Actor(映射阶段)中顺序运行,因此与原始单线程解决方案相比,没有任何好处。

为了从 Akka Streams 提供的并行性中受益,您需要有多个处理阶段,每个处理阶段执行以下任务

每个元素 1µs,另请参阅文档。

我做了一些改变。我的代码现在看起来像:

我不确定我是否完全错了,但我使用 akka-streams 的实现仍然慢得多(现在甚至比以前更慢),但我发现:如果我增加工作,例如通过做一些除法,用 akka 实现-streams 变得更快。因此,如果我做对了(否则请纠正我),我的示例中似乎有太多开销。因此,如果代码必须做繁重的工作,您只能从 akka-streams 中受益吗?




我在 scala 和 akka-stream 中都比较新。我写了一个小测试项目,它会创建一些事件,直到计数器达到特定数字。对于每个事件,正在计算事件的一个字段的阶乘。我实施了两次。一次使用akka-stream,一次没有akka-stream(单线程)并比较运行时。

我没想到:当我创建一个事件时,两个程序的运行时间几乎相同。但是如果我创建 70,000,000 个事件,那么没有 akka-streams 的实现会快得多。这是我的结果(以下数据基于 24 次测量):


  • 没有 akka-streams 的单个事件403 (+- 2)ms
  • 带有 akka-streams 的单个事件444 (+-13)ms


  • 没有 akka-streams 的 70Mio 事件11778 (+-70)ms

  • 70Mio 事件与 akka-steams : 75424(+-2959)ms

所以我的问题是:发生了什么事?为什么我的 akka-stream 实现速度较慢?

这是我的代码:

使用 Akka 实现

没有 Akka 的实现

对象单线程 {

共享功能

实施活动