2

我正在尝试构建一个简单的流程,其中一个源、一个接收器以及它们之间的两个“流”。所以像

FlowGraph.closed() { builder: FlowGraph.Builder[Unit] =>
  val in = Source(1 to 10)
  val out = Sink.ignore
  val f1 = Flow[Int].map(_ + 1)
  val f2 = Flow[Int].map(_ + 2)
  builder.addEdge(builder.add(in), f1, builder.add(out))

  // builder.addEdge(builder.add(in), f1, f2, builder.add(out)) // does not compile

}.run

注释行无法编译,但演示了我想要实现的目标。

该示例的设计是因为定义一个添加 3 的新函数或组合这些函数同样容易,但实际上这些函数要复杂得多,并且为了简单起见将它们分开。

我不想在这里进行扇出或扇入,只是一个可以在它们之间拥有任意数量的函数的直线。

谢谢。

4

2 回答 2

2

上的via方法Flow应该做你想做的(即f1 via f2)。

请参阅scaladocs

请注意,您还可以

val f = Flow[Int].
  map(_ + 1).
  map(_ + 2)

如果你想保持分离。或者,如果您将函数提取为g1and g2,您还可以

val g1 = (i: Int) => i + 1
val g2 = (i: Int) => i + 2
val f = Flow[Int].map(g1 andThen g2)

一般来说,我建议尽可能多地使用函数,并在你真正需要它们时保存流。

于 2015-07-15T06:00:20.430 回答
1

这里的问题是你需要一个 FlowShape。

您有两种使用 addEdge 的方法:

def addEdge[A, B, M2](from: Outlet[A], via: Graph[FlowShape[A, B], M2], to: Inlet[B]): Unit

def addEdge[T](from: Outlet[T], to: Inlet[T]): Unit

要使用构建器执行您想要执行的操作,您可以创建 2 个 FlowShape 并使用from: Outlet[T], to: Inlet[T]连接它们。

FlowGraph.closed() { builder: FlowGraph.Builder[Unit] =>
    val in = Source(1 to 10)
    val out = Sink.foreach(println)

//    val f1: Flow[Int, Int, Unit] = Flow[Int].map(_ + 1)
//    val f2: Flow[Int, Int, Unit] = Flow[Int].map(_ + 2)

    val f1: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1))
    val f2: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 2))
    builder.addEdge(builder.add(in), f1.inlet) //Source to f1 in
    builder.addEdge(f1.outlet, f2.inlet) // f1 out to f2 in
    builder.addEdge(f2.outlet, builder.add(out)) // f2 out to sink
  }.run()

我留下了类型,所以你可以看到区别。

第二种选择是使用部分图创建 FlowShape。

val partialFlow: Graph[FlowShape[Int, Int], Unit] = FlowGraph.partial() { builder =>
    val f1 = builder.add(Flow[Int].map(_ + 1))
    val f2 = builder.add(Flow[Int].map(_ + 2))
    builder.addEdge(f1.outlet, f2.inlet)

    FlowShape(f1.inlet, f2.outlet)
  }

FlowGraph.closed() { builder: FlowGraph.Builder[Unit] =>
    val in = Source(1 to 10)
    val out = Sink.foreach(println)
    builder.addEdge(builder.add(in), partialFlow, builder.add(out))
  }.run()
于 2015-08-07T07:28:07.597 回答