3

使用更复杂的图表时,我在让发布者和订阅者退出我的流程时遇到问题。我的目标是提供发布者和订阅者的 API,并在内部运行 Akka 流。这是我的第一次尝试,效果很好。

val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)

val flow = subscriberSource.to(someFunctionSink)

//create Reactive Streams Subscriber
val subscriber: Subscriber[Boolean] = flow.run()

//prints true
Source.single(true).to(Sink(subscriber)).run()

但是对于更复杂的广播图,我不确定如何获取订阅者和发布者对象?我需要部分图吗?

val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)
val publisherSink = Sink.publisher[Boolean]

FlowGraph.closed() { implicit builder =>
  import FlowGraph.Implicits._

  val broadcast = builder.add(Broadcast[Boolean](2))

  subscriberSource ~> broadcast.in
  broadcast.out(0) ~> someFunctionSink
  broadcast.out(1) ~> publisherSink
}.run()

val subscriber: Subscriber[Boolean] = ???
val publisher: Publisher[Boolean] = ???
4

1 回答 1

8

当您调用RunnableGraph.run()流时运行,结果是该运行的“物化值”。

Source.subscriber[Boolean]在您的简单示例中,物化值为Subscriber[Boolean]。在您的复杂示例中,您希望将图形的多个组件的物化值组合成一个物化值,即 tuple (Subscriber[Boolean], Publisher[Boolean])

您可以通过将您对其物化值感兴趣的组件传递给FlowGraph.closed()然后指定一个函数来组合物化值来做到这一点:

import akka.stream.scaladsl._
import org.reactivestreams._

val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)
val publisherSink = Sink.publisher[Boolean]

val graph =
  FlowGraph.closed(subscriberSource, publisherSink)(Keep.both) { implicit builder ⇒
    (in, out) ⇒
      import FlowGraph.Implicits._

      val broadcast = builder.add(Broadcast[Boolean](2))

      in ~> broadcast.in
      broadcast.out(0) ~> someFunctionSink
      broadcast.out(1) ~> out
  }
val (subscriber: Subscriber[Boolean], publisher: Publisher[Boolean]) = graph.run()

有关FlowGraph.closed 重载的更多信息,请参阅 Scaladocs

Keep.both函数的缩写(a, b) => (a, b)

于 2015-07-03T11:53:41.990 回答