3

我试图理解为什么下面的代码片段正在做它正在做的事情。我原以为因为 Sink 不能比 Source 产生内容更快地产生需求,所以我会收到丢弃消息以响应某些提议(溢出策略设置为 Drop Buffer)以及错误和队列关闭消息在自毁片之后。

片段:

package playground

import java.time.LocalDateTime
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream._
import akka.stream.scaladsl.{Sink, Source}

import scala.concurrent.duration._

case object MessageToSink

object Playground extends App {

  implicit val actorSystem = ActorSystem("Playground")
  implicit val execCntxt = actorSystem.dispatcher

  val sinkActor = actorSystem.actorOf(Props[Actor2SinkFwder])
  actorSystem.scheduler.schedule(1 millisecond, 50 milliseconds, sinkActor, MessageToSink)

  println(s"Playground has started... ${LocalDateTime.now()}")
}

class Actor2SinkFwder extends Actor with ActorLogging {

  implicit val materializer = ActorMaterializer()
  implicit val execCtxt = context.dispatcher

  val flow = Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategy.dropBuffer)
    .to(Sink.foreach[Int] {
      i =>
        println(s"$i Sinking starts at ${LocalDateTime.now()}")
        Thread.sleep(150)
        if (i == 5) throw new RuntimeException("KaBoom!")
        println(s"$i Sinking completes at ${LocalDateTime.now()}")
    }).run()

  val i: AtomicInteger = new AtomicInteger(0)

  override def receive: Receive = {
    case MessageToSink =>
      val num = i.incrementAndGet()
      println(s"$num Sink Command received at ${LocalDateTime.now()}")
      flow.offer(num).collect {
        case Enqueued => println(s"$num Enqueued ${LocalDateTime.now}")
        case Dropped => println(s"$num Dropped ${LocalDateTime.now}")
        case Failure(err) => println(s"$num Failed ${LocalDateTime.now} $err")
        case QueueClosed => println(s"$num Failed ${LocalDateTime.now} QueueClosed")
      }
   }
}

输出:

Playground has started... 2016-12-27T18:35:29.574
1 Sink Command received at 2016-12-27T18:35:29.640
2 Sink Command received at 2016-12-27T18:35:29.642
3 Sink Command received at 2016-12-27T18:35:29.642
1 Sinking starts at 2016-12-27T18:35:29.649
1 Enqueued 2016-12-27T18:35:29.650
4 Sink Command received at 2016-12-27T18:35:29.688
5 Sink Command received at 2016-12-27T18:35:29.738
6 Sink Command received at 2016-12-27T18:35:29.788
1 Sinking completes at 2016-12-27T18:35:29.799
2 Sinking starts at 2016-12-27T18:35:29.800
2 Enqueued 2016-12-27T18:35:29.800
7 Sink Command received at 2016-12-27T18:35:29.838
8 Sink Command received at 2016-12-27T18:35:29.888
9 Sink Command received at 2016-12-27T18:35:29.938
2 Sinking completes at 2016-12-27T18:35:29.950
3 Sinking starts at 2016-12-27T18:35:29.951
3 Enqueued 2016-12-27T18:35:29.951
10 Sink Command received at 2016-12-27T18:35:29.988
11 Sink Command received at 2016-12-27T18:35:30.038
12 Sink Command received at 2016-12-27T18:35:30.088
3 Sinking completes at 2016-12-27T18:35:30.101
4 Sinking starts at 2016-12-27T18:35:30.101
4 Enqueued 2016-12-27T18:35:30.101
13 Sink Command received at 2016-12-27T18:35:30.138
14 Sink Command received at 2016-12-27T18:35:30.189
15 Sink Command received at 2016-12-27T18:35:30.238
4 Sinking completes at 2016-12-27T18:35:30.251
5 Sinking starts at 2016-12-27T18:35:30.251
5 Enqueued 2016-12-27T18:35:30.252
16 Sink Command received at 2016-12-27T18:35:30.288
17 Sink Command received at 2016-12-27T18:35:30.338
18 Sink Command received at 2016-12-27T18:35:30.388
19 Sink Command received at 2016-12-27T18:35:30.438
20 Sink Command received at 2016-12-27T18:35:30.488
21 Sink Command received at 2016-12-27T18:35:30.538
22 Sink Command received at 2016-12-27T18:35:30.588
23 Sink Command received at 2016-12-27T18:35:30.638
24 Sink Command received at 2016-12-27T18:35:30.688
25 Sink Command received at 2016-12-27T18:35:30.738
26 Sink Command received at 2016-12-27T18:35:30.788
etc...

我认为我的误解是围绕在 QueueSource 类中使用getAsyncCallback。即使 QueueSource 中的报价调用使用正确的报价详细信息调用 stageLogic,阶段逻辑中此代码的实际处理程序在前一个元素完成处理之前不会被调用,因此没有用于检查缓冲区大小或应用溢出的逻辑策略正在得到应用... :-/

4

2 回答 2

6

要查看您期望的结果,您应该在 your和 yourasync之间添加一个阶段。这是一种告诉 Akka 使用两个不同的 Actor 来运行两个阶段的方法——通过在两者之间强制设置一个异步边界。SourceSink

如果没有async,Akka 将通过在一个 actor 中粉碎所有内容来优化执行,这将使处理顺序化。正如您所注意到的,在您的示例中,一条消息被offer编入队列,直到Thread.sleep(150)前一条消息的完成。可以在此处找到有关该主题的更多信息。

  val flow = Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategy.dropBuffer)
    .async
    .to(Sink.foreach[Int] {...}).run()

此外,您应该在匹配.offer结果时再添加一个案例。这是 的FailureFutureFuture下游队列失败时完成。这适用于offer前 5 条消息之后的所有消息

  override def receive: Receive = {
    case MessageToSink =>
      val num = i.incrementAndGet()
      println(s"$num Sink Command received at ${LocalDateTime.now()}")
      flow.offer(num).onComplete {
        case Success(Enqueued) => println(s"$num Enqueued ${LocalDateTime.now}")
        case Success(Dropped) => println(s"$num Dropped ${LocalDateTime.now}")
        case Success(Failure(err)) => println(s"$num Failed ${LocalDateTime.now} $err")
        case Success(QueueClosed) => println(s"$num Failed ${LocalDateTime.now} QueueClosed")
        case util.Failure(err) => println(s"$num Failed ${LocalDateTime.now} with exception $err")
      }
  }

请注意,即使执行上述所有操作,您也不会看到任何QueueOfferResult.Dropped结果。那是因为你选择了DropBuffer策略。每个传入的消息都将排队(因此产生一条Enqueued消息),踢出现有的缓冲区。如果您将策略更改为DropNew,您应该会开始看到一些Dropped消息。

于 2016-12-28T12:07:11.690 回答
0

我找到了我在评论中写的问题的答案,我认为与原始问题非常相关,所以我想像答案一样添加它(但正确的答案是来自 stefano 的答案)。

导致这种行为的元素是缓冲区,但不是我们明确配置的缓冲区,例如map.(...).buffer(1,OverflowStrategy.dropBuffer).async,而是基于物化构建的内部缓冲区。此缓冲区专门为性能而实现,并且是在物化上执行的蓝图优化的一部分。

虽然流水线通常会增加吞吐量,但实际上存在通过异步(因此线程交叉)边界传递元素的成本,这很重要。为了摊销这个成本,Akka Streams 在内部使用了一个窗口化的批处理背压策略。它是窗口化的,因为与 Stop-And-Wait 协议相反,多个元素可能与元素请求同时“进行中”。它也是批处理,因为一旦从窗口缓冲区中耗尽了一个元素,就不会立即请求一个新元素,而是在耗尽多个元素后请求多个元素。这种批处理策略降低了通过异步边界传播背压信号的通信成本

关于内部缓冲区的文档与显式缓冲区接近并且是“使用速率”部分的一部分并不是偶然的。

BatchingActorInputBoundary 具有inputBuffer

  /* Bad: same number of emitted and consumed events, i.e. DOES NOT DROP
  Emitted: 1
  Emitted: 1
  Emitted: 1
  Consumed: 1
  Emitted: 1
  Emitted: 1
  Consumed: 1
  Consumed: 1
  Consumed: 1
  Consumed: 1
  */
  def example1() {
    val c = Source.tick(500 millis, 500 millis, 1)
      .map(x => {
        println("Emitted: " + x)
        x
      })
      .buffer(1, OverflowStrategy.dropBuffer).async
      .toMat(Sink.foreach[Int](x => {
        Thread.sleep(5000)
        println("Consumed: " + x)
      }))(Keep.left)
      .run
    Thread.sleep(3000)
    c.cancel()

}

上面导致意外(对我来说!)行为的示例可以通过减少内部缓冲区的大小来“解决”

.toMat(Sink.foreach[Int](x => {
            Thread.sleep(5000)
            println("Consumed: " + x)
          }))
          (Keep.left).addAttributes(Attributes.inputBuffer(initial = 1, max = 1))

现在,上游的一些元素被丢弃,但有一个大小为 1 的最小输入缓冲区,我们获得以下输出:

Emitted: 1
Emitted: 1
Emitted: 1
Emitted: 1
Emitted: 1
Consumed: 1
Consumed: 1
Consumed: 1

我希望这个答案能为stefano 的答案增添价值。

akka 团队永远领先一步

通常,当时间或速率驱动的处理阶段表现出奇怪的行为时,首先尝试的解决方案之一应该是将受影响元素的输入缓冲区减少到 1。

** 更新: **

Konrad Malawski 认为这是一个活泼的解决方案,并建议我将这种行为实现为 GraphStage。这里是。

class LastElement[A] extends GraphStage[FlowShape[A,A]] {
    private val in = Inlet[A]("last-in")
    private val out = Outlet[A]("last-out")

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
      var pushPending: Option[A] = None

      override def preStart(): Unit = pull(in)

      def pushIfAvailable() = if (isAvailable(out)) {
        pushPending.foreach(p => {
          push(out, p)
          pushPending = None
        })
      }

      setHandler(out, new OutHandler {
        override def onPull(): Unit = pushIfAvailable
      })

      setHandler(in,new InHandler {
        override def onPush(): Unit = {
          pushPending = Some(grab(in))
          pushIfAvailable
          pull(in)
        }
      })

    }

    override def shape: FlowShape[A, A] = FlowShape(in,out)
  }
于 2017-12-14T14:37:28.887 回答