1

我试图弄清楚如何从 scala Akka Stream 图中获得物化结果。

我正在使用"com.typesafe.akka" %% "akka-stream-experimental" % "1.0".

我查看了文档,但找不到任何示例。

所以,假设我有一个代码

val g = FlowGraph.closed() { implicit builder=>
  import FlowGraph.Implicits._

  val in = Source.apply(1 until 10)
  val plus = Flow[Int].map(_ + 10)
  val out = Sink.fold[Seq[Int], Int](Nil){
    case (acc, num) => if (num % 2 == 0) acc :+ num else acc
  }

  in ~> plus ~> out
}

val result = g.run()

我想从图中得到结果,g但它返回 Unit。如何处理?

谢谢你。

4

2 回答 2

1

编辑:您可能还想看看您是如何创建流程的......我认为RunnableGraph可能更适合您的情况,然后run返回Mat类型。

否则,请尝试查看.mapMaterializedValue(v => ...)手册本节中的调用(特别是第 40 行):

http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-flows-and-basics.html#Combining_materialized_values

我认为这就是你想要做的。

编辑:我发现的这个聊天应用程序也使用了它(第 54 行):

https://github.com/jrudolph/akka-http-scala-js-websocket-chat/blob/master/backend/src/main/scala/example/akkawschat/Chat.scala

于 2015-09-15T15:09:10.443 回答
0

看来你不能从FlowGraph. 您需要在外部创建所有流程,然后在FlowGraph.

val in = Source.apply(1 until 10)
val plus = Flow[Int].map(_ + 10)
val out = Sink.fold[Seq[Int], Int](Nil) {
  case (acc, num) => if (num % 2 == 0) acc :+ num else acc
}

所以,代码看起来像

val g = FlowGraph.closed(in, plus, out)((_, _, _)) { implicit builder => (src, f, dst) =>
  import FlowGraph.Implicits._
  src ~> f ~> dst
}

我发现更简单的另一种方法是

val z = in.via(plus).toMat(out)(Keep.right) 
于 2015-09-16T13:40:43.810 回答