0

我正在努力理解 akka-stream 是否在 Source 上强制执行背压时,一个分支的广播在图中需要大量时间(异步)。

我尝试查看是否在源bufferbatch施加了任何背压,但看起来不像。我也尝试过冲洗System.out,但它没有改变任何东西。

object Test extends App {
/* Necessary for akka stream */
implicit val system = ActorSystem("test")
implicit val materializer: ActorMaterializer = ActorMaterializer()

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val in = Source.tick(0 seconds, 1 seconds, 1)
        in.runForeach(i => println("Produced " + i))

    val out = Sink.foreach(println)
    val out2 = Sink.foreach[Int]{ o => println(s"2 $o") }

    val bcast = builder.add(Broadcast[Int](2))

    val batchedIn: Source[Int, Cancellable] = in.batch(4, identity) {
        case (s, v) => println(s"Batched ${s+v}"); s + v
    }

    val f2 = Flow[Int].map(_ + 10)
    val f4 = Flow[Int].map { i => Thread.sleep(2000); i}

    batchedIn ~> bcast ~> f2 ~> out
                 bcast ~> f4.async ~> out2
    ClosedShape
})

g.run()
}

当我运行程序时,我希望在控制台中看到“Batched ...”,并且在某些时候它会暂时卡住,因为 f4 的速度不足以处理这些值。目前,这些都没有像预期的那样表现,因为数字是连续生成的,没有批处理。

编辑:我注意到一段时间后,批处理消息开始在控制台中打印出来。我仍然不知道为什么它不会尽快发生,因为第一个元素应该发生背压

4

1 回答 1

1

解释这种行为的原因是 akka 在设置异步边界时引入的内部缓冲区。

异步运算符的缓冲区

使用异步运算符时作为优化引入的内部缓冲区。


虽然流水线通常会增加吞吐量,但实际上存在通过异步(因此线程交叉)边界传递元素的成本,这很重要。为了摊销这个成本,Akka Streams 在内部使用了一个窗口化的批处理背压策略。它是窗口化的,因为与 Stop-And-Wait 协议相反,多个元素可能与元素请求同时“进行中”。它也是批处理,因为一旦从窗口缓冲区中耗尽了一个元素,就不会立即请求一个新元素,而是在耗尽多个元素后请求多个元素。这种批处理策略降低了通过异步边界传播背压信号的通信成本。

我知道这是一个玩具流,但如果你解释你的目标是什么,我会尽力帮助你。

你需要mapAsync而不是async

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import akka.stream.scaladsl.GraphDSL.Implicits._

  val in = Source.tick(0 seconds, 1 seconds, 1).map(x => {println(s"Produced ${x}"); x})

  val out = Sink.foreach[Int]{ o => println(s"F2 processed $o") }
  val out2 = Sink.foreach[Int]{ o => println(s"F4 processed $o") }

  val bcast = builder.add(Broadcast[Int](2))

  val batchedIn: Source[Int, Cancellable] = in.buffer(4,OverflowStrategy.backpressure)

  val f2 = Flow[Int].map(_ + 10)
  val f4 = Flow[Int].mapAsync(1) { i => Future { println("F4 Started Processing"); Thread.sleep(2000); i }(system.dispatcher) }

  batchedIn ~> bcast ~> f2 ~> out
  bcast ~> f4 ~> out2
  ClosedShape
}).run()
于 2018-12-29T15:35:16.973 回答