7

我的程序中有几个Flows,我想并行处理。全部完成后,我想触发一些动作。

一种方法是在每次完成后向 Actor 发送消息,当 Actor 验证所有流都准备好时,它就可以触发动作。

我想知道在 akka-streams Scala DSL 中是否有任何我可能会忽略的东西使它变得更加简单。

编辑:将流转换为未来将不起作用,因为正如文档所述,未来在流中发生的第一个事件之后立即完成。运行以下代码:

implicit val system = ActorSystem("Sys")
val fm = FlowMaterializer(MaterializerSettings())

def main(args: Array[String]): Unit = {
  val fut = Flow(1 second, {() => println("tick")}).toFuture(fm)

  fut.onComplete{ _ =>
    println("future completed")
  }
}

打印“tick”,然后是“future completed”,然后是无限序列的“tick”。

4

2 回答 2

9

正如评论中提到的,我相信@Eduardo 正确Flow地将Future. 考虑这个例子:

implicit val system = ActorSystem("Sys")
import system.dispatcher

val text1 = 
  """hello1world
  foobar""".stripMargin

val text2 = 
  """this1is
  a1test""".stripMargin

def flowFut(text:String) = Flow(text.split("\\s").toVector)
  .map(_.toUpperCase())
  .map(_.replace("1", ""))
  .toFuture(FlowMaterializer(MaterializerSettings()))    


val fut1 = flowFut(text1)    
val fut2 = flowFut(text2)    
val fut3 = for{
  f1 <- fut1
  f2 <- fut2
} yield {
  s"$f1, $f2"
}

fut3 foreach {println(_)}

在这里,我对每组文本行运行两个单独的转换,转换为大写并从任何文本中删除 #1。然后我将结果强制Flow为 a Future,以便我可以将结果组合成一个新的Future,然后打印出来。

于 2014-08-16T12:41:41.627 回答
0

我懂了。如果流程处理多个元素,则未来将在第一个元素之后完成。

我认为您可以使用 flow.onComplete 来完成一些承诺。例如

val promise1 = Promise[Unit]()
val promise2 = Promise[Unit]()

val flow1 = Flow(Iterator(1,2,3,4)).map(println)
val flow2 = Flow(Iterator('a,'b,'c,'d)).map(println)


flow1.onComplete(FlowMaterializer(MaterializerSettings())){
  case Success(_) =>  promise1.success()
  case Failure(e) => promise1.failure(e)
}
flow2.onComplete(FlowMaterializer(MaterializerSettings())){
  case Success(_) =>  promise2.success()
  case Failure(e) => promise2.failure(e)
}

for {
  e1<- promise1.future
  e2<- promise2.future
}{
  println(s"completed!")
}

另一方面,如果想要在每个元素的元组完成处理后做一些事情,您可能可以使用flow1.zip(flow2)来组合它们。

于 2014-08-16T23:16:02.953 回答