我正在寻找SinkSource
提供 aSink
和 a 的 a Source
。如果元素流入该元素,Sink
则应在相应的Source
. 以下代码说明了我的意思:
object SinkSource {
def apply[T] = new {
def sink: Sink[T] = ???
def source: Source[T] = ???
}
}
val flowgraph = FlowGraph { implicit fgb =>
import FlowGraphImplicits._
val sinksource = SinkSource[Int]
Source(1 to 5) ~> sinksource.sink
sinksource.source ~> Sink.foreach(print)
}
implicit val actorSystem = ActorSystem(name = "System")
implicit val flowMaterializer = FlowMaterializer()
val materializedMap = flowgraph.run()
如果执行这应该打印:12345
那么,是否SinkSource
存在(在 API 中没有看到它)或者有谁知道如何实现它?我应该提到我需要独特的访问权限Sink
,Source
因此这Flow
不是这种特定形式的解决方案:
Source(1 to 5) ~> Flow[Int] ~> Sink.foreach(println)