1

我正在寻找SinkSource提供 aSink和 a 的 a Source。如果元素流入该元素,Sink则应在相应的Source. 以下代码说明了我的意思:

object SinkSource {
  def apply[T] = new {
    def sink: Sink[T] = ???
    def source: Source[T] = ???
  }
}
val flowgraph = FlowGraph { implicit fgb =>
  import FlowGraphImplicits._
  val sinksource = SinkSource[Int]
  Source(1 to 5) ~> sinksource.sink
                    sinksource.source ~> Sink.foreach(print)
}
implicit val actorSystem = ActorSystem(name = "System")
implicit val flowMaterializer = FlowMaterializer()
val materializedMap = flowgraph.run()

如果执行这应该打印:12345
那么,是否SinkSource存在(在 API 中没有看到它)或者有谁知道如何实现它?我应该提到我需要独特的访问权限SinkSource因此这Flow不是这种特定形式的解决方案:

Source(1 to 5) ~> Flow[Int] ~> Sink.foreach(println)
4

1 回答 1

1

通常,如果已经提出问题,就会想到一些想法:事实证明,我不需要 aSink和 a SourceJunctionInPortJunctionOutPort足够了。
所以这里是:

object SinkSource {
  def apply[T](implicit fgb: FlowGraphBuilder) = new SinkSource[T]
}
class SinkSource[T](implicit fgb: FlowGraphBuilder) {
  import FlowGraphImplicits._
  private val merge = Merge[T]
  private val bcast = Broadcast[T]
  Source.empty ~> merge
  merge ~> bcast
  bcast ~> Sink.ignore
  def in: JunctionInPort[T] = merge
  def out: JunctionOutPort[T] = bcast
}
val flowgraph = FlowGraph { implicit fgb =>
  import FlowGraphImplicits._
  val source = Source(1 to 5)
  val sink = Sink.foreach(println)
  val sinkSource = SinkSource[Int]
  source ~> sinkSource.in
            sinkSource.out ~> sink
}
implicit val actorSystem = ActorSystem(name = "System")
implicit val flowMaterializer = FlowMaterializer()
val materializedMap = flowgraph.run()
于 2015-01-27T17:10:49.960 回答