我开始学习Akka Streams,这是一个用于处理具有背压功能的数据的框架。该库是Akka的一部分,它把自己描述为:
Akka 是一个工具包和运行时,用于在 JVM 上构建高度并发、分布式和弹性的消息驱动应用程序。
这些能力来自 Akka actor 的本质。但是,在我看来,流处理和演员是互不相关的概念。
问题:Akka Streams 是否利用了 Akka Actor 的这些特性?如果是,您能解释一下演员如何帮助流媒体吗?
我开始学习Akka Streams,这是一个用于处理具有背压功能的数据的框架。该库是Akka的一部分,它把自己描述为:
Akka 是一个工具包和运行时,用于在 JVM 上构建高度并发、分布式和弹性的消息驱动应用程序。
这些能力来自 Akka actor 的本质。但是,在我看来,流处理和演员是互不相关的概念。
问题:Akka Streams 是否利用了 Akka Actor 的这些特性?如果是,您能解释一下演员如何帮助流媒体吗?
Akka Streams 是比 actor 更高级别的抽象。它是建立在Actor 模型之上的 Reactive Streams 的实现。它利用了所有的演员功能,因为它使用演员。
您甚至可以直接在流的任何部分直接使用演员。查看 ActorPublisher 和 ActorSubscriber。
一个很好的起点是akka 流快速入门。
是的, anActor
用于“物化” a 的每个 { Source
, Flow
, Sink
} Stream
。这意味着当您创建一个 Stream 时,在流被具体化之前实际上什么都不会发生,通常是通过.run()
方法调用。
例如,这里定义了一个 Stream:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Flow, Sink}
val stream = Source.single[String]("test")
.via(Flow[String].filter(_.size > 0))
.to(Sink.foreach{println})
即使流现在是val
没有实际发生的计算。Stream 只是计算的秘诀。要真正开始工作,Stream 需要具体化。这是一个不使用隐式来清楚显示物化如何发生的示例:
val actorSystem = ActorSystem()
val materializer = ActorMaterializer()(actorSystem)
stream.run()(materializer) //work begins
现在(至少)创建了 3 个 Actor:1 个用于Source.single
,1 个用于Flow.filter
,1 个用于Sink.foreach
. 注意:您可以使用相同materializer
的方式启动其他流
val doesNothingStream = Source.empty[String]
.to(Sink.ignore)
.run()(materializer)