我正在努力理解 akka-stream 是否在 Source 上强制执行背压时,一个分支的广播在图中需要大量时间(异步)。
我尝试查看是否在源buffer
上batch
施加了任何背压,但看起来不像。我也尝试过冲洗System.out
,但它没有改变任何东西。
object Test extends App {
/* Necessary for akka stream */
implicit val system = ActorSystem("test")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source.tick(0 seconds, 1 seconds, 1)
in.runForeach(i => println("Produced " + i))
val out = Sink.foreach(println)
val out2 = Sink.foreach[Int]{ o => println(s"2 $o") }
val bcast = builder.add(Broadcast[Int](2))
val batchedIn: Source[Int, Cancellable] = in.batch(4, identity) {
case (s, v) => println(s"Batched ${s+v}"); s + v
}
val f2 = Flow[Int].map(_ + 10)
val f4 = Flow[Int].map { i => Thread.sleep(2000); i}
batchedIn ~> bcast ~> f2 ~> out
bcast ~> f4.async ~> out2
ClosedShape
})
g.run()
}
当我运行程序时,我希望在控制台中看到“Batched ...”,并且在某些时候它会暂时卡住,因为 f4 的速度不足以处理这些值。目前,这些都没有像预期的那样表现,因为数字是连续生成的,没有批处理。
编辑:我注意到一段时间后,批处理消息开始在控制台中打印出来。我仍然不知道为什么它不会尽快发生,因为第一个元素应该发生背压