问题标签 [akka-stream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 如何在akka流中使用mapAsync使用分组子流
我需要做一些与此非常相似的事情https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala
我的问题是我有一个未知数量的组,如果 mapAsync 的并行数少于我得到的组数和最后一个接收器中的错误
由于上游错误(akka.stream.impl.StreamSubscriptionTimeoutSupport$$anon$2)而拆除 SynchronousFileSink(/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt)
我尝试按照 akka 流的模式指南http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html中的建议在中间放置一个缓冲区
但结果相同
scala - 如何处理带有内部排队反应流订阅者的 OnComplete 消息?
我正在使用带有简单反应流的 Akka-Stream 1.0:
- 发布者发送 N 条消息
- 订阅者消费了 N 条消息
和
发布者将通过发送OnComplete
消息在 N 条消息(动态)后关闭流。
订阅者收到消息并立即进入canceled
状态。问题是,订阅者需要一些时间来处理每条消息,这意味着我通常会积压一些消息 - 随着订阅者的获取,这些消息无法再处理canceled
- 恕我直言ActorSubscriber.scala:195
处理消息意味着我的订阅者会将工作卸载给其他人(通过 Spray 发送内容ChunkedMessage
)并在消息完成后立即收到确认消息。由于 Actor 被取消,ack 消息永远不会被处理,并且 backlog 会被处理。
有什么建议让我完成积压工作?我可以“发明”我自己的“完成标记”,但这对我来说听起来很奇怪。显然,我的代码MaxInFlightRequestStrategy
最多可以使用 1 - 因为那里的需求总是只有 1 - 这意味着我从来没有积压的消息。
scala - 自定义 Supervision.Decider 不会捕获 ActorPublisher 产生的异常
我正在构建一个将由第 3 方使用的库。在我的一种方法中,我返回Stream[Item]
的是从分页 REST API 调用的结果异步生成的。
我正在使用我对BulkPullerAsync的修改。我的代码在这里。
我希望我的流的接收者能够处理错误。根据文档,我应该使用 custom Supervision.Decider
。
不幸的是,它没有捕捉到我的 ActionPublisher 中抛出的异常。我看到它已处理,ActorPublisher.onError
已调用但未达到Supervision.Decider
. 它适用于文档中提供的简单 Stream。
如果我使用,错误也不会到达演员Sink.actorRef
。
我应该怎么办 ?我希望我的用户Stream
不应该依赖于其实施的性质。
UPD:为了实验,我尝试了以下示例
在这种情况下,异常被Decider
.
UPD2:我试图通过Publisher
从不同类型的来源生成然后将它们转换回Source
预期所有错误都会Subscriber.onError
发生来欺骗它:)所以显然混合ActorPublisher + Decider有问题......
总的来说,我认为这是不一致的行为。我不能使用一种机制来处理Stream
.
scala - 基于发布者的源不输出元素
我基于这样的 ReactiveStreams Publisher 为 Akka Stream 创建了一个 Source:
它将向 Flickr 发出搜索请求,并将结果作为JsValue
s 获取。我试图将它连接到许多不同的流和接收器,但这将是最基本的设置:
我看到onNext
被调用了几次,然后是onComplete
. 但是,Sink 没有收到任何东西。我错过了什么,这不是创建源的有效方法吗?
scala - Akka HTTP ambiguous implicit conversion
I'm cutting my teeth on Akka HTTP by working this example. For the purposes of learning, I converted it to a Maven project. However, I'm getting compilation errors as follows using Akka v2.3.12 and Akka Stream v1.0. The POST DSL fails with similar errors that I'm not posting for brevity. How can I get the example to run?
akka-stream - 使用 Source.actorPublisher 和 FlowGraph 时在 akka-stream 1.0 中获取 actorRef
我的问题与某处有关:访问由 Source.actorRef 创建的 akka 流 Source 的底层 ActorRef 有一些区别:
- 我正在使用 akka-stream 实验 1.0
- 我正在使用 actorPublisher 模型
- 我正在使用 FlowGraph dsl 进行并行处理的流定义
我找不到让 actorRef 向 Source 持有的 Actor Publisher 实例发送消息的方法。
感谢帮助 !
奥利弗
scala - 在 akka-stream 中,如何从期货集合中创建无序 Source
我需要akka.stream.scaladsl.Source[T, Unit]
从Future[T]
.
例如,有一个返回整数的期货集合,
如何创建一个
从中。
我不能使用Future.sequence
组合器,从那时起我会等待每个未来完成,然后再从源头获取任何东西。一旦任何未来完成,我想以任何顺序获得结果。
我知道这Source
是一个纯粹的功能性 API,它不应该在以某种方式实现它之前运行任何东西。所以,我的想法是使用一个Iterator
(这是懒惰的)来创建一个源:
但这将是期货的来源,而不是实际价值的来源。我也可以阻止next
使用Await.result(future)
,但我不确定哪个线程池的线程会被阻止。这也将按顺序调用期货,而我需要并行执行。
更新 2:事实证明有一种更简单的方法(感谢 Viktor Klang):
更新:这是我根据@sschaef 的回答得到的:
scala - 为什么 Akka 流循环没有在此图中结束?
我想创建一个在下沉之前循环 n 次的图表。我刚刚创建了这个满足我要求的示例,但在下沉后并没有结束,我真的不明白为什么。有人可以启发我吗?
谢谢。
scala - Scala Akka Stream:如何通过 Seq
我正在尝试将一些阻塞调用包装在Future
. 返回类型是Seq[User]
where User
is a case class
。以下内容无法与存在各种重载版本的投诉一起编译。有什么建议么?我尝试了几乎所有的变化都Source.apply
没有任何运气。
akka - 关闭 Akka 流以进行资源清理
使用 Akka Streams 时,是否有任何方法可以关闭/关闭资源清理不再需要的流?
编辑:当源由无限流组成时,它可能永远不会完成,我想在完成源之前停止它。
示例用法:
有没有办法关闭流?