这里的问题是你需要一个 FlowShape。
您有两种使用 addEdge 的方法:
def addEdge[A, B, M2](from: Outlet[A], via: Graph[FlowShape[A, B], M2], to: Inlet[B]): Unit
和
def addEdge[T](from: Outlet[T], to: Inlet[T]): Unit
要使用构建器执行您想要执行的操作,您可以创建 2 个 FlowShape 并使用from: Outlet[T], to: Inlet[T]
连接它们。
FlowGraph.closed() { builder: FlowGraph.Builder[Unit] =>
val in = Source(1 to 10)
val out = Sink.foreach(println)
// val f1: Flow[Int, Int, Unit] = Flow[Int].map(_ + 1)
// val f2: Flow[Int, Int, Unit] = Flow[Int].map(_ + 2)
val f1: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1))
val f2: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 2))
builder.addEdge(builder.add(in), f1.inlet) //Source to f1 in
builder.addEdge(f1.outlet, f2.inlet) // f1 out to f2 in
builder.addEdge(f2.outlet, builder.add(out)) // f2 out to sink
}.run()
我留下了类型,所以你可以看到区别。
第二种选择是使用部分图创建 FlowShape。
val partialFlow: Graph[FlowShape[Int, Int], Unit] = FlowGraph.partial() { builder =>
val f1 = builder.add(Flow[Int].map(_ + 1))
val f2 = builder.add(Flow[Int].map(_ + 2))
builder.addEdge(f1.outlet, f2.inlet)
FlowShape(f1.inlet, f2.outlet)
}
FlowGraph.closed() { builder: FlowGraph.Builder[Unit] =>
val in = Source(1 to 10)
val out = Sink.foreach(println)
builder.addEdge(builder.add(in), partialFlow, builder.add(out))
}.run()