2

我想创建一个在下沉之前循环 n 次的图表。我刚刚创建了这个满足我要求的示例,但在下沉后并没有结束,我真的不明白为什么。有人可以启发我吗?

谢谢。

    import akka.actor.ActorSystem
    import akka.stream.scaladsl._
    import akka.stream.{ActorMaterializer, UniformFanOutShape}

    import scala.concurrent.Future

    object test {
      def main(args: Array[String]) {
        val ignore: Sink[Any, Future[Unit]] = Sink.ignore
        val closed: RunnableGraph[Future[Unit]] = FlowGraph.closed(ignore) { implicit b =>
          sink => {
            import FlowGraph.Implicits._

            val fileSource = Source.single((0, Array[String]()))
            val merge = b.add(MergePreferred[(Int, Array[String])](1).named("merge"))
            val afterMerge = Flow[(Int, Array[String])].map {
              e =>
                println("after merge")
                e
            }
            val broadcastArray: UniformFanOutShape[(Int, Array[String]), (Int, Array[String])] = b.add(Broadcast[(Int, Array[String])](2).named("broadcastArray"))
            val toRetry = Flow[(Int, Array[String])].filter {
              case (r, s) => {
                println("retry " + (r < 3) + " " + r)
                r < 3
              }
            }.map {
              case (r, s) => (r + 1, s)
            }
            val toSink = Flow[(Int, Array[String])].filter {
              case (r, s) => {
                println("sink " + (r >= 3) + " " + r)
                r >= 3
              }
            }
            merge.preferred <~ toRetry <~ broadcastArray
            fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
          }
        }
        implicit val system = ActorSystem()
        implicit val _ = ActorMaterializer()
        val run: Future[Unit] = closed.run()
        import system.dispatcher
        run.onComplete {
          case _ => {
            println("finished")
            system.shutdown()
          }
        }
      }
    }`
4

1 回答 1

13

Stream 永远不会完成,因为合并永远不会发出完成的信号。

格式化图形结构后,它基本上看起来像:

//ignoring the preferred which is inconsequential

fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
              merge <~ toRetry    <~ broadcastArray

未完成的问题根源于您的合并步骤:

// 2 inputs into merge

fileSource ~> merge 
              merge <~ toRetry

一旦 fileSource 发出了它的单个元素(即(0, Array.empty[String])),它就会发出一条complete消息进行合并。

但是,fileSource 的完成消息在合并时被阻止。从文档中:

akka.stream.scaladsl.MergePreferred

当所有上游完成 (eagerClose=false) 或一个上游完成 (eagerClose=true) 时完成

在其所有输入流完成 complete之前,合并不会发送出去。

// fileSource is complete ~> merge 
//                           merge <~ toRetry is still running

// complete fileSource + still running toRetry = still running merge

因此,合并将等到toRetry也完成。但toRetry永远不会完成,因为它正在等待merge完成。

如果您希望您的特定图表在 fileSource 完成后完成,那么只需设置eagerClose=True这将导致在 fileSource 完成后完成合并。例如:

//Add this true                                             |
//                                                          V
val merge = b.add(MergePreferred[(Int, Array[String])](1, true).named("merge")

没有流循环

您的问题存在一个更简单的解决方案。只需使用一个Flow.map利用尾递归函数的阶段:

//Note: there is no use of akka in this implementation

type FileInputType = (Int, Array[String])

@scala.annotation.tailrec
def recursiveRetry(fileInput : FileInputType) : FileInputType = 
  fileInput match { 
    case (r,_) if r >= 3  => fileInput
    case (r,a)            => recursiveRetry((r+1, a))
  }    

然后您的流将减少到

//ring-fenced akka code

val recursiveRetryFlow = Flow[FileInputType] map recursiveRetry

fileSource ~> recursiveRetryFlow ~> toSink ~> sink

结果是更清晰的流,并且避免了将“业务逻辑”与 akka 代码混合。这允许完全独立于任何第三方库的重试功能的单元测试。您在流中嵌入的重试循环是“业务逻辑”。因此,无论好坏,混合实现都与 akka 紧密耦合。

此外,在隔离解决方案中,循环包含在尾递归函数中,这是惯用的 Scala

于 2015-11-27T17:58:50.887 回答