问题标签 [akka-stream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 迭代错误/异常处理与反应式流/akka-stream
令人惊讶的是,我在 Iteratees 和错误处理方面遇到了一些问题。
问题;
从一个(来自网络,必须是 InputStream)中读取一些字节,InputStream
在这个 InputStream 上做一些分块/grouing(用于工作分配),然后将其转换为case class DataBlock(blockNum: Int, data: ByteString)
用于发送给参与者的转换(Array[Bytes] 转换为紧凑字节字符串)。
流量;
InputStream.read -- bytes --> Group -- 1000 byte blocks --> Transform -- DataBlock --> Actors
编码;
问题;
我当前的 Iteratee 代码运行良好。但是,我希望能够处理任何一方的故障;
- 当 InputStream
read
方法失败时 - 我想知道有多少字节/块已成功处理并从该点继续读取流。当在枚举器中读取时抛出错误,fut
只是返回异常,没有状态,所以我不知道我在做什么,除非我将它传递给 rxing 演员(我不想这样做) - 如果输出端失败或无法再接收 DataBlock 消息,因为 Actor 的缓冲区已满,保持从输入流读取
我该怎么做?
因为我需要定义的错误处理,我怎么能/我会更好地使用反应流/Akka-stream(实验性)或scalaz迭代来尝试这个?
scala - 在 akka 流中使用 mapFuture
我正在使用 Akka Streams,并且一直在尝试对从 MongoDB 集合中轮询的事件进行一些丰富和处理。但是,我对实现事件丰富器的最佳方法有一些疑问,这可能需要连接到外部数据源。
mapFuture 似乎很合适,但我遇到了一些问题:
我的应用程序:
但是,我遇到了这个错误:
在调用 mapFuture 时。
我错过了什么?
有什么更好的想法来处理这些丰富的内容?
更新 堆栈跟踪:
谢谢
scala - Akka Streams:如何等到几个 Flows 完成
我的程序中有几个Flow
s,我想并行处理。全部完成后,我想触发一些动作。
一种方法是在每次完成后向 Actor 发送消息,当 Actor 验证所有流都准备好时,它就可以触发动作。
我想知道在 akka-streams Scala DSL 中是否有任何我可能会忽略的东西使它变得更加简单。
编辑:将流转换为未来将不起作用,因为正如文档所述,未来在流中发生的第一个事件之后立即完成。运行以下代码:
打印“tick”,然后是“future completed”,然后是无限序列的“tick”。
akka - 使用 Akka-Http 流式传输视频或(未知长度的流)
我正在为一个实验项目开发 akka-http(akka-http-experimental_2.11 - 0.4)。而且我之前没有在Spray上工作过。
我想将 mp4 视频(大小可能不同)流式传输到浏览器。但我不知道如何为 HttpResponse(HttpEntity.Chunked ?)创建 HttpEntity。我尝试过像这样肮脏的东西,这不是正确的方法,但这在 Firefox 中仅适用于单个请求。
当我在另一个选项卡或浏览器中打开相同的 url 时,服务器无法处理该请求。由于这是一个实验项目,因此没有足够的文档用于大文件流式传输。
我需要知道如何为 HttpEntity.Chunked 创建 Producer。如果有人可以简单地解释一下,那将有助于理解 API。
谢谢你。
(PS:有人请在 Stack Overflow 中创建 Akka-Http 标签)
scala - Akka Stream 中的异常/错误处理
我已经定义了以下管道:
和流:
增强器看起来像这样:
现在,如果满足导致 Augmenter1 中异常的条件,则流程只会在异常的第一个实例处(成功地)终止,而不会引发任何异常。我希望能够做两件事:在链上捕获异常,然后跳到下一个事件。
我的问题:处理流程中的错误/异常的正确方法是什么?
谢谢
scala - 如何使用 Akka Streams 在分隔符上拆分入站流
我一直在玩一些实验性的 Akka Streams API,我有一个用例,我想看看如何实现。对于我的用例,我有一个StreamTcp
基础Flow
,它通过将连接的输入流绑定到我的服务器套接字来提供。我拥有的流程基于ByteString
进入其中的数据。传入的数据将有一个分隔符,这意味着我应该将分隔符之前的所有内容视为一条消息,并将下一个分隔符之后的所有内容视为下一条消息。所以玩一个更简单的例子,不使用套接字,只使用静态文本,这就是我想出的:
Flow
我发现实现我的目标的主要功能是splitWhen
,然后生成额外的子流,每个.
分隔符对应每个消息。然后,我使用另一个步骤管道处理每个子流,最后在最后打印各个消息。
这一切似乎有点冗长,以完成我认为非常简单和常见的用例。所以我的问题是,是否有一种更简洁、更简洁的方式来执行此操作,或者这是通过分隔符拆分流的正确和首选方式?
scala - 测试 Akka 反应式流
我正在测试通过以下方式获得的传出流 TCP 连接流式传输消息的代码:
在我的测试中,我将结果Subscriber[ByteString]
替换为虚拟订阅者,触发一些传出消息,并断言已按预期到达。我使用下面的方法来生成虚拟订阅者和未来的流结果。(到目前为止,一切都很好)
我的问题是:是否有一些规范的方法来测试流输出预期值,类似于 Akka 的TestActorRef
?如果没有,是否有一些类似于上述函数的库函数?
akka - 如何为订户提供路由能力
我有以下流的路径-
我不确定如何以 akka 流的方式编写它。我试过跟随(伪)。
k1、k2、k3 是 kafka 流发布者
所以这就是我将发布者连接到接收器的方式。但是这里我要解决的问题是缓慢的 IO。IOHandler 演员处理消息的速度非常慢,所以我如何拥有多个 IOHandler 并且我应该能够分配任务。而且我还想保持背压,所以不要使用火,忘记使用路由器。
我对akka流很陌生,所以建议我一条出路。
谢谢
scala - 为什么异常不会停止 Akka Stream 流
我有一个依赖 API 响应的流程。当响应不符合我的预期时,将引发异常。此策略非常适用于 Spray 和使用 specs2 的直接方法测试。
但是,当我尝试使用带有异常抛出模块的流时,流只会停止。
这是我的流程:
我的策略是map
用于期货。
像这样:
这是一个位于完全不同位置的潜在异常引发器:
好像我错过了一些东西,但看不到什么。感谢您的任何指示。
java - 在 Scala 中替代线程执行器池
我的应用程序要求我有多个线程运行从各种 HDFS 节点获取数据。为此,我正在使用线程执行器池和分叉线程。 分叉在:
我的班级消费:
消费执行者:
但是,我想在不需要提供固定线程池大小的地方使用 Akka 流/Akka 演员,而 Akka 会处理所有事情。我对 Akka 以及 Streaming 和 actor 的概念还很陌生。有人可以以示例代码的形式给我任何线索以适合我的用例吗?提前致谢!