2

我正在通过在内部构建图表来制作自定义水槽。这是我的代码的广泛简化,以证明我的问题:

def mySink: Sink[Int, Unit] = Sink() { implicit builder =>

    val entrance = builder.add(Flow[Int].buffer(500, OverflowStrategy.backpressure))
    val toString = builder.add(Flow[Int, String, Unit].map(_.toString))
    val printSink = builder.add(Sink.foreach(elem => println(elem)))

    builder.addEdge(entrance.out, toString.in)
    builder.addEdge(toString.out, printSink.in)

    entrance.in
}

我遇到的问题是,虽然创建具有相同输入/输出类型且只有一个类型参数且没有值参数的流是有效的,例如:(Flow[Int]在整个文档中),但仅提供两个是无效的类型参数和零值参数。

根据Flow 对象的参考文档,apply正在寻找的方法定义为

def apply[I, O]()(block: (Builder[Unit]) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Unit]

并说

通过将 FlowGraph.Builder 传递给给定的创建函数来创建流。

create 函数应返回一对 Inlet 和 Outlet,它们对应于创建的 Flows 输入和输出端口。

当我尝试制作我认为非常简单的流程时,似乎我需要处理另一个级别的图形构建器。有没有一种更简单、更简洁的方法来创建一个 Flow 来改变它的输入和输出的类型,而不需要弄乱它的内部端口?如果这是解决此问题的正确方法,那么解决方案会是什么样子?

奖励:为什么很容易制作一个不改变其输出类型的流?

4

1 回答 1

3

如果您想同时指定流的输入和输出类型,您确实需要使用您在文档中找到的 apply 方法。但是,使用它的方式与您已经使用的方式几乎完全相同。

Flow[String, Message]() { implicit b =>
  import FlowGraph.Implicits._

  val reverseString = b.add(Flow[String].map[String] { msg => msg.reverse })
  val mapStringToMsg = b.add(Flow[String].map[Message]( x => TextMessage.Strict(x)))

  // connect the graph
  reverseString ~> mapStringToMsg

  // expose ports
  (reverseString.inlet, mapStringToMsg.outlet)
}

不是只返回入口,而是返回一个元组,包括入口和出口。现在,我们可以将这个流程与特定的 Source 或 Sink 一起使用(例如在另一个构建器中,或直接与 runWith 一起使用)。

于 2015-05-23T08:19:27.193 回答