问题标签 [akka-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
829 浏览

scala - 如何结束无限的akka​​流

我是 Akka Streams 的新手,但有一个案例我想用它来寻找来自无限来源的排列。具有有限源的简化示例可能如下所示。

此示例输出:

我显然对源过去很好,42但我不想在能够得到结果之前耗尽整个流。

问题是,当我找到我要找的东西时,我应该如何结束流?

0 投票
2 回答
6261 浏览

scala - akka http:Akka 流与演员建立休息服务

当涉及在 akka http 上创建具有 60 多个 API 的 REST Web 服务时。我如何选择应该使用 akka 流还是使用 akka 演员?在他的帖子中,Jos 展示了在 akka http 上创建 API 的两种方法,但他没有展示我何时应该选择其中一种。

0 投票
1 回答
1073 浏览

scala - 使用 akka 流和喷雾将 csv 文件流式传输到浏览器

如何将 aSource[String, Unit]连接到流媒体演员?

我认为https://gist.github.com/whysoserious/96050c6b4bd5fedb6e33StreamingActor的修改版本会很好用,但我很难连接这些部分。

鉴于source: Source[String, Unit]and ctx: RequestContext,我认为修改后的StreamingActor应该与actorRefFactory.actorOf(fromSource(source, ctx)).

供参考,上面的要点:

0 投票
1 回答
1479 浏览

debugging - 如何调试 akka 流?

当我在方法 processLine 中的某处放置断点时,调试器不会在该行停止。它的执行就像没有任何断点一样。调试akka流的流程是否有所不同,我该如何解决这个问题?

0 投票
2 回答
5395 浏览

akka - 使用 akka 流有条件地跳过流

我正在使用 akka 流,并且我有一部分图表需要有条件地跳过,因为流无法处理某些值。具体来说,我有一个接受字符串并发出 http 请求的流程,但是当字符串为空时服务器无法处理这种情况。但我只需要返回一个空字符串。有没有办法做到这一点而不必通过http请求知道它会失败?我基本上有这个:

我唯一能想到的就是在我的 httpResponse 流中捕获 400 错误并返回一个默认值。但是我希望能够避免为我知道会事先失败的请求而访问服务器的开销。

0 投票
3 回答
1713 浏览

scala - 如何对来自无限流的传入事件进行分组?

我有无限的事件流:

IE

这些事件我想按 session_uid 分组并计算每个会话的流量总和。

我编写了一个akka-streams流,可以很好地使用有限流groupBy(我的代码基于食谱中的这个示例)。但是对于无限流,它将不起作用,因为groupBy函数应该处理所有传入的流,并且只有在此之后才准备好返回结果。

我认为我应该实现超时分组,即如果我没有收到指定stream_uid 的事件超过5 分钟,我应该为这个session_uid 返回分组事件。但是如何实现它只使用akka-streams呢?

0 投票
0 回答
1043 浏览

scala - groupBy 是否在 akka-stream 中泄漏?

我想编写一个流,akka-stream用于通过 session_uid 对来自无限流的事件进行分组,并计算每个会话的流量总和(我的上一个问题中的详细信息)。

我将Source#groupBy通过 session_uid 将函数用于组事件,但似乎该函数会在内部累积所有组键并且无法释放它们。这是导致java.lang.OutOfMemoryError: Java heap space异常。这是重现它的代码:

那么,在完成相关事件流( )处理后,如何释放sessionUid里面的分组键( )呢?groupBysessionEvents

可能有人知道基于 session_uid 对事件进行分组的另一种方式akka-stream吗?

0 投票
0 回答
447 浏览

scala - 如何手动创建 Source 并将元素推送到它?

我想创建自定义StatefulStage,它应该像groupBy方法和发出Source[A, Unit]元素一样工作,但我不明白如何创建实例Source[A, Unit]并将传入元素推送到它。这是存根:

您可以将以下代码段用于测试 GroupBy 流(它应该从创建的流中打印事件):

谁能解释我该怎么做?

更新:

我根据这个答案编写了以下onPush方法,但它没有打印事件。据我了解,只有当它作为流的一部分运行但我想在测试片段之外运行流时,我才能将元素推送到源。如果我像本例中那样运行流程,那么它将处理事件并将它们发送到. 我认为这是我的测试片段没有打印事件的原因。GroupByGroupBySink.ignore

那么,如何解决呢?

0 投票
0 回答
705 浏览

akka-stream - 长管道中的 Akka 流超时

如果我有一个很长的 akka-stream 管道,有没有办法处理超时,直到第一个元素到达管道中的给定位置才开始超时?

例如,假设我有一个管道,其中第一个元素需要 2 分钟以上才能到达最终接收器,但在那之后,元素应该每隔一秒左右进入。这是akka考虑到的吗?或者在这种情况下我是否必须单独为我的图形形状设置超时?

0 投票
1 回答
170 浏览

scala - 用于增长列表的 scalaz 流结构

我有一种预感,我可以(应该?)使用 scalaz-streams 来解决我的问题,就像这样。

我有一个起始项目 A。我有一个接受 A 并返回 A 列表的函数。

我有一个以 1 项(起始项)开头的工作队列。当我们处理 ( doSomething) 每个项目时,它可能会将许多项目添加到同一工作队列的末尾。然而,在某些时候(在数百万个项目之后),我们doSomething处理的每个后续项目将开始向工作队列添加越来越少的项目,最终不会添加新项目(doSomething 将为这些项目返回 Nil)。这就是我们知道计算最终将终止的方式。

假设 scalaz-streams 适用于此,请给我一些提示,说明我应该考虑哪些整体结构或类型来实现它?

一旦完成了使用单个“worker”的简单实现,我还想使用多个 worker 并行处理队列项,例如拥有 5 个 worker 池(每个 worker 将其任务分配给代理进行计算doSomething)所以我需要在这个算法中处理效果(比如工人失败)。