我试图理解为什么下面的代码片段正在做它正在做的事情。我原以为因为 Sink 不能比 Source 产生内容更快地产生需求,所以我会收到丢弃消息以响应某些提议(溢出策略设置为 Drop Buffer)以及错误和队列关闭消息在自毁片之后。
片段:
package playground
import java.time.LocalDateTime
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream._
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
case object MessageToSink
object Playground extends App {
implicit val actorSystem = ActorSystem("Playground")
implicit val execCntxt = actorSystem.dispatcher
val sinkActor = actorSystem.actorOf(Props[Actor2SinkFwder])
actorSystem.scheduler.schedule(1 millisecond, 50 milliseconds, sinkActor, MessageToSink)
println(s"Playground has started... ${LocalDateTime.now()}")
}
class Actor2SinkFwder extends Actor with ActorLogging {
implicit val materializer = ActorMaterializer()
implicit val execCtxt = context.dispatcher
val flow = Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategy.dropBuffer)
.to(Sink.foreach[Int] {
i =>
println(s"$i Sinking starts at ${LocalDateTime.now()}")
Thread.sleep(150)
if (i == 5) throw new RuntimeException("KaBoom!")
println(s"$i Sinking completes at ${LocalDateTime.now()}")
}).run()
val i: AtomicInteger = new AtomicInteger(0)
override def receive: Receive = {
case MessageToSink =>
val num = i.incrementAndGet()
println(s"$num Sink Command received at ${LocalDateTime.now()}")
flow.offer(num).collect {
case Enqueued => println(s"$num Enqueued ${LocalDateTime.now}")
case Dropped => println(s"$num Dropped ${LocalDateTime.now}")
case Failure(err) => println(s"$num Failed ${LocalDateTime.now} $err")
case QueueClosed => println(s"$num Failed ${LocalDateTime.now} QueueClosed")
}
}
}
输出:
Playground has started... 2016-12-27T18:35:29.574
1 Sink Command received at 2016-12-27T18:35:29.640
2 Sink Command received at 2016-12-27T18:35:29.642
3 Sink Command received at 2016-12-27T18:35:29.642
1 Sinking starts at 2016-12-27T18:35:29.649
1 Enqueued 2016-12-27T18:35:29.650
4 Sink Command received at 2016-12-27T18:35:29.688
5 Sink Command received at 2016-12-27T18:35:29.738
6 Sink Command received at 2016-12-27T18:35:29.788
1 Sinking completes at 2016-12-27T18:35:29.799
2 Sinking starts at 2016-12-27T18:35:29.800
2 Enqueued 2016-12-27T18:35:29.800
7 Sink Command received at 2016-12-27T18:35:29.838
8 Sink Command received at 2016-12-27T18:35:29.888
9 Sink Command received at 2016-12-27T18:35:29.938
2 Sinking completes at 2016-12-27T18:35:29.950
3 Sinking starts at 2016-12-27T18:35:29.951
3 Enqueued 2016-12-27T18:35:29.951
10 Sink Command received at 2016-12-27T18:35:29.988
11 Sink Command received at 2016-12-27T18:35:30.038
12 Sink Command received at 2016-12-27T18:35:30.088
3 Sinking completes at 2016-12-27T18:35:30.101
4 Sinking starts at 2016-12-27T18:35:30.101
4 Enqueued 2016-12-27T18:35:30.101
13 Sink Command received at 2016-12-27T18:35:30.138
14 Sink Command received at 2016-12-27T18:35:30.189
15 Sink Command received at 2016-12-27T18:35:30.238
4 Sinking completes at 2016-12-27T18:35:30.251
5 Sinking starts at 2016-12-27T18:35:30.251
5 Enqueued 2016-12-27T18:35:30.252
16 Sink Command received at 2016-12-27T18:35:30.288
17 Sink Command received at 2016-12-27T18:35:30.338
18 Sink Command received at 2016-12-27T18:35:30.388
19 Sink Command received at 2016-12-27T18:35:30.438
20 Sink Command received at 2016-12-27T18:35:30.488
21 Sink Command received at 2016-12-27T18:35:30.538
22 Sink Command received at 2016-12-27T18:35:30.588
23 Sink Command received at 2016-12-27T18:35:30.638
24 Sink Command received at 2016-12-27T18:35:30.688
25 Sink Command received at 2016-12-27T18:35:30.738
26 Sink Command received at 2016-12-27T18:35:30.788
etc...
我认为我的误解是围绕在 QueueSource 类中使用getAsyncCallback。即使 QueueSource 中的报价调用使用正确的报价详细信息调用 stageLogic,阶段逻辑中此代码的实际处理程序在前一个元素完成处理之前不会被调用,因此没有用于检查缓冲区大小或应用溢出的逻辑策略正在得到应用... :-/