使用更复杂的图表时,我在让发布者和订阅者退出我的流程时遇到问题。我的目标是提供发布者和订阅者的 API,并在内部运行 Akka 流。这是我的第一次尝试,效果很好。
val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)
val flow = subscriberSource.to(someFunctionSink)
//create Reactive Streams Subscriber
val subscriber: Subscriber[Boolean] = flow.run()
//prints true
Source.single(true).to(Sink(subscriber)).run()
但是对于更复杂的广播图,我不确定如何获取订阅者和发布者对象?我需要部分图吗?
val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)
val publisherSink = Sink.publisher[Boolean]
FlowGraph.closed() { implicit builder =>
import FlowGraph.Implicits._
val broadcast = builder.add(Broadcast[Boolean](2))
subscriberSource ~> broadcast.in
broadcast.out(0) ~> someFunctionSink
broadcast.out(1) ~> publisherSink
}.run()
val subscriber: Subscriber[Boolean] = ???
val publisher: Publisher[Boolean] = ???