问题标签 [akka-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
572 浏览

scala - 迭代错误/异常处理与反应式流/akka-stream

令人惊讶的是,我在 Iteratees 和错误处理方面遇到了一些问题。

问题;

从一个(来自网络,必须是 InputStream)中读取一些字节,InputStream在这个 InputStream 上做一些分块/grouing(用于工作分配),然后将其转换为case class DataBlock(blockNum: Int, data: ByteString)用于发送给参与者的转换(Array[Bytes] 转换为紧凑字节字符串)。

流量;

InputStream.read -- bytes --> Group -- 1000 byte blocks --> Transform -- DataBlock --> Actors

编码;

问题;

我当前的 Iteratee 代码运行良好。但是,我希望能够处理任何一方的故障;

  1. 当 InputStreamread方法失败时 - 我想知道有多少字节/块已成功处理并从该点继续读取流。当在枚举器中读取时抛出错误,fut只是返回异常,没有状态,所以我不知道我在做什么,除非我将它传递给 rxing 演员(我不想这样做)
  2. 如果输出端失败或无法再接收 DataBlock 消息,因为 Actor 的缓冲区已满,保持从输入流读取

我该怎么做?

因为我需要定义的错误处理,我怎么能/我会更好地使用反应流/Akka-stream(实验性)或scalaz迭代来尝试这个?

0 投票
1 回答
526 浏览

scala - 在 akka 流中使用 mapFuture

我正在使用 Akka Streams,并且一直在尝试对从 MongoDB 集合中轮询的事件进行一些丰富和处理。但是,我对实现事件丰富器的最佳方法有一些疑问,这可能需要连接到外部数据源。

mapFuture 似乎很合适,但我遇到了一些问题:

我的应用程序:

但是,我遇到了这个错误:

在调用 mapFuture 时。

我错过了什么?

有什么更好的想法来处理这些丰富的内容?

更新 堆栈跟踪:

谢谢

0 投票
2 回答
4907 浏览

scala - Akka Streams:如何等到几个 Flows 完成

我的程序中有几个Flows,我想并行处理。全部完成后,我想触发一些动作。

一种方法是在每次完成后向 Actor 发送消息,当 Actor 验证所有流都准备好时,它就可以触发动作。

我想知道在 akka-streams Scala DSL 中是否有任何我可能会忽略的东西使它变得更加简单。

编辑:将流转换为未来将不起作用,因为正如文档所述,未来在流中发生的第一个事件之后立即完成。运行以下代码:

打印“tick”,然后是“future completed”,然后是无限序列的“tick”。

0 投票
1 回答
1843 浏览

akka - 使用 Akka-Http 流式传输视频或(未知长度的流)

我正在为一个实验项目开发 akka-http(akka-http-experimental_2.11 - 0.4)。而且我之前没有在Spray上工作过。

我想将 mp4 视频(大小可能不同)流式传输到浏览器。但我不知道如何为 HttpResponse(HttpEntity.Chunked ?)创建 HttpEntity。我尝试过像这样肮脏的东西,这不是正确的方法,但这在 Firefox 中仅适用于单个请求。

当我在另一个选项卡或浏览器中打开相同的 url 时,服务器无法处理该请求。由于这是一个实验项目,因此没有足够的文档用于大文件流式传输。

我得到了示例源代码形式https://github.com/akka/akka/blob/release-2.3-dev/akka-http-core/src/test/scala/akka/http/TestServer.scala

我需要知道如何为 HttpEntity.Chunked 创建 Producer。如果有人可以简单地解释一下,那将有助于理解 API。

谢谢你。

(PS:有人请在 Stack Overflow 中创建 Akka-Http 标签)

0 投票
0 回答
1455 浏览

scala - Akka Stream 中的异常/错误处理

我已经定义了以下管道:

和流:

增强器看起来像这样:

现在,如果满足导致 Augmenter1 中异常的条件,则流程只会在异常的第一个实例处(成功地)终止,而不会引发任何异常。我希望能够做两件事:在链上捕获异常,然后跳到下一个事件。

我的问题:处理流程中的错误/异常的正确方法是什么?

谢谢

0 投票
4 回答
4405 浏览

scala - 如何使用 Akka Streams 在分隔符上拆分入站流

我一直在玩一些实验性的 Akka Streams API,我有一个用例,我想看看如何实现。对于我的用例,我有一个StreamTcp基础Flow,它通过将连接的输入流绑定到我的服务器套接字来提供。我拥有的流程基于ByteString进入其中的数据。传入的数据将有一个分隔符,这意味着我应该将分隔符之前的所有内容视为一条消息,并将下一个分隔符之后的所有内容视为下一条消息。所以玩一个更简单的例子,不使用套接字,只使用静态文本,这就是我想出的:

Flow我发现实现我的目标的主要功能是splitWhen,然后生成额外的子流,每个.分隔符对应每个消息。然后,我使用另一个步骤管道处理每个子流,最后在最后打印各个消息。

这一切似乎有点冗长,以完成我认为非常简单和常见的用例。所以我的问题是,是否有一种更简洁、更简洁的方式来执行此操作,或者这是通过分隔符拆分流的正确和首选方式?

0 投票
1 回答
1494 浏览

scala - 测试 Akka 反应式流

我正在测试通过以下方式获得的传出流 TCP 连接流式传输消息的代码:

在我的测试中,我将结果Subscriber[ByteString]替换为虚拟订阅者,触发一些传出消息,并断言已按预期到达。我使用下面的方法来生成虚拟订阅者和未来的流结果。(到目前为止,一切都很好)

我的问题是:是否有一些规范的方法来测试流输出预期值,类似于 Akka 的TestActorRef?如果没有,是否有一些类似于上述函数的库函数?

0 投票
1 回答
412 浏览

akka - 如何为订户提供路由能力

我有以下流的路径-

我不确定如何以 akka 流的方式编写它。我试过跟随(伪)。

k1、k2、k3 是 kafka 流发布者

所以这就是我将发布者连接到接收器的方式。但是这里我要解决的问题是缓慢的 IO。IOHandler 演员处理消息的速度非常慢,所以我如何拥有多个 IOHandler 并且我应该能够分配任务。而且我还想保持背压,所以不要使用火,忘记使用路由器。

我对akka流很陌生,所以建议我一条出路。

谢谢

0 投票
2 回答
1580 浏览

scala - 为什么异常不会停止 Akka Stream 流

我有一个依赖 API 响应的流程。当响应不符合我的预期时,将引发异常。此策略非常适用于 Spray 和使用 specs2 的直接方法测试。

但是,当我尝试使用带有异常抛出模块的流时,流只会停止。

这是我的流程:

我的策略是map用于期货。

像这样:

这是一个位于完全不同位置的潜在异常引发器:

好像我错过了一些东西,但看不到什么。感谢您的任何指示。

0 投票
1 回答
484 浏览

java - 在 Scala 中替代线程执行器池

我的应用程序要求我有多个线程运行从各种 HDFS 节点获取数据。为此,我正在使用线程执行器池和分叉线程。 分叉在:

我的班级消费:

消费执行者:

但是,我想在不需要提供固定线程池大小的地方使用 Akka 流/Akka 演员,而 Akka 会处理所有事情。我对 Akka 以及 Streaming 和 actor 的概念还很陌生。有人可以以示例代码的形式给我任何线索以适合我的用例吗?提前致谢!