问题标签 [akka-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
356 浏览

java - Java-Akka:组合来自多个参与者的消息

我是阿卡的新手。我有4个演员。

Actor1 向 Actor4 发送 Message1。Actor2 将 Message2 发送给 Actor4。Actor3 向 Actor4 发送 Message3。

Actor4 应该在收到 1 条 Message1 类型的消息、1 条 Message2 类型的消息和 2 条 Message3 类型的消息时创建 Message4。

消息 4 =(消息 1、消息 2、消息 3a、消息 3b、消息 3c)

这样做的最佳方法是什么?

0 投票
1 回答
1427 浏览

scala - 如何从广播的 Akka 流中获取订阅者和发布者?

使用更复杂的图表时,我在让发布者和订阅者退出我的流程时遇到问题。我的目标是提供发布者和订阅者的 API,并在内部运行 Akka 流。这是我的第一次尝试,效果很好。

但是对于更复杂的广播图,我不确定如何获取订阅者和发布者对象?我需要部分图吗?

0 投票
1 回答
1885 浏览

scala - 创建具有相同演员的演员发布者和演员订阅者

我是 akka 流的新手。我使用 kafka 作为源(使用 ReactiveKafka 库)并通过流对数据进行一些处理,并使用订阅者(EsHandler)作为接收器。

现在我需要处理错误并通过错误处理程序将其推送到不同的 kafka 队列。我正在尝试将 EsHandler 用作发布者和订阅者。我不确定如何将 EsHandler 作为中间人而不是接收器。

这是我的代码:

0 投票
1 回答
127 浏览

scala - 将数据从演员流式传输到播放结果

我有数据以块的形式到达演员,并希望将这些块作为流返回Play Result。由于获得响应的唯一方法Ok.stream看起来像是理想的候选人,因此如下所示:

我会Enumerator[Array[Byte]]从我的演员那里返回一个,然后在演员内部继续将块推入枚举器,因为消息到达演员。然而:从一个演员返回一个可变的枚举器绝对看起来像是某种违反。

有没有更合适的方法来做到这一点?我想要么akka-streamakka.io将是可能解决问题空间的抽象,但我看不出它们将如何应用。

0 投票
2 回答
4945 浏览

postgresql - 使用 slick 的 3.0.0 流媒体结果和 Postgresql 的正确方法是什么?

我正在尝试弄清楚如何使用流畅的流媒体。我使用 slick 3.0.0 和 postgres 驱动程序

情况如下:服务器必须将客户端数据序列拆分为受大小(以字节为单位)限制的块。所以,我写了以下精巧的查询:

我将 seq 与 akka-streams 结合起来Source,编写了 custom PushPullStage,它限制了数据的大小(以字节为单位)并在达到大小限制时完成上游。它工作得很好。问题是 - 当我查看 postgres 日志时,我看到这样的查询 select * from sequences where user_id = 0 and timestamp > 0 order by timestamp;

因此,乍一看,似乎有很多(并且不必要的)数据库查询正在进行,只是在每个查询中使用了几个字节。使用 Slick 进行流式传输以最小化数据库查询并充分利用每个查询中传输的数据的正确方法是什么?

0 投票
2 回答
259 浏览

scala - 在构建 FlowGraph 时,如何将两个 Flow 连接起来?

我正在尝试构建一个简单的流程,其中一个源、一个接收器以及它们之间的两个“流”。所以像

注释行无法编译,但演示了我想要实现的目标。

该示例的设计是因为定义一个添加 3 的新函数或组合这些函数同样容易,但实际上这些函数要复杂得多,并且为了简单起见将它们分开。

我不想在这里进行扇出或扇入,只是一个可以在它们之间拥有任意数量的函数的直线。

谢谢。

0 投票
2 回答
10167 浏览

scala - 使用 Akka-Streams HTTP 将整个 HttpResponse 主体作为字符串获取

我正在尝试了解如何使用新akka.http库。我想向服务器发送一个 http 请求,并将整个响应正文作为单个字符串读取,以便生成一个Source[String,?].

这是迄今为止我能够产生的最佳解决方案:

它似乎运行良好(除了缺少的错误路径),但对于这样简单的任务来说有点笨拙。有更聪明的解决方案吗?我可以避免grouped/mkString吗?

0 投票
1 回答
726 浏览

scala-2.11 - How do i create a TCP receiver that only consumes messages using akka streams?

We are on: akka-stream-experimental_2.11 1.0.

Inspired by the example

We wrote a TCP receiver as follows:

However, our intention was to have the receiver not respond at all and only sink the message. (The TCP message publisher does not care about response ).

Is it even possible? to not respond at all since akka.stream.scaladsl.Tcp.IncomingConnection takes a flow of type: Flow[ByteString, ByteString, Unit]

If yes, some guidance will be much appreciated. Thanks in advance.

One attempt as follows passes my unit tests but not sure if its the best idea:

0 投票
2 回答
697 浏览

scala-2.11 - 如何从 akka.stream.io.Framing$FramingException 中恢复

开启:akka-stream-experimental_2.11 1.0。

我们在 Tcp 服务器中使用 Framing.delimiter。当消息到达的长度大于 maximumFrameLength 时,会抛出 FramingException,我们可以从 ActorSubscriber 的 OnError 中捕获它。

服务器代码:

订阅者代码:

问题:在该传入连接的此异常之后,低于最大帧长度的消息不会流动。杀死客户端并重新运行它工作正常。

ActorSubscriber 不尊重监督

跳过坏消息并继续下一条好消息的正确方法是什么?

0 投票
1 回答
566 浏览

scala - Akka:使用物化值转换接收器的输入

我有一个接收器:Sink[String, Mat]并想将其转换为接收器:Sink[Int, Mat]通过映射每个元素:num: Int => ("num" + num): String并保留原始的物化类型和值。

如果MatUnit,那么很容易:

但是为任何人改造一个水槽Mat呢?