0

我们有一个 Akka 应用程序,它使用 Kafka 主题并将接收到的消息发送到 Akka Actor。我不确定我的编程方式是否使用了 Akka Streams 中内置的背压机制的所有好处。

以下是我的配置...

val control : Consumer.DrainingControl[Done]
Consumer
 .sourceWitOffsetContext(consumerSettings, Subscriptions.topics("myTopic"))
 .map(consumerRecord =>
     val myAvro = consumerRecord.value().asInstanceOf[MyAvro]
     
     val myActor = AkkaSystem.sharding.getMyActor(myAvro.getId)
     
     myActor ! Update(myAvro)          
 )
 .via(Commiter.flowWithOffsetContext(CommitterSettings(AkkaSystem.system.toClassic)))
 .toMat(Sink.ignore)(Consumer.DrainControl.apply)
 .run()

这符合我对业务案例的期望,myActor 收到命令更新(MyAvro)

我对背压的技术概念更加恼火,据我所知,背压机制部分由接收器控制,但在这个流配置中,我的接收器只是“Sink.ignore”。所以我的 Sink 正在为 Back Pressure 做任何事情。

当 Akka Kafka Stream 提交 Kafka Topic 偏移量时,我还很好奇什么?命令发送到 MyActor 邮箱的那一刻?如果是这样,那么我如何处理诸如询问模式之类的场景,Kafka Offset 不应该在询问模式完成之前提交。

我看到一些处理手动偏移控制“plainPartitionedManualOffsetSource”、“commitablePartitionManualOffsetSource”的工厂方法,但我找不到任何示例,我可以用我的业务逻辑决定手动提交偏移吗?

作为替代配置,我可以使用类似的东西。

val myActor: ActorRef[MyActor.Command] = AkkaSystem.sharding.getMyActor
val (control, result) =
  Consumer
    .plainSource(consumerSettings, Subscriptions.topics("myTopic"))
    .toMat(Sink.actorRef(AkkaSystem.sharding.getMyActor(myAvro.getId), null))(Keep.both)
    .run()

现在我可以访问 Sink.actorRef,我认为 Back Pressure 机制有机会控制 Back Pressure,自然这段代码将不起作用,因为我不知道如何在这个星座下访问“myAvro”。

谢谢回答..

4

2 回答 2

1

在第一个流中,基本上不会有背压。偏移量提交将在消息发送到后很快发生myActor

对于背压,您需要等待目标参与者的响应,正如您所说,询问模式是实现这一目标的规范方式。由于从演员外部询问演员(出于所有意图和目的,流在演员之外:阶段由演员执行是实现细节)导致 a Future,这表明这mapAsync是需要的。

def askUpdate(m: MyAvro): Future[Response] = ???  // get actorref from cluster sharding, send ask, etc.

然后map,您将原始流中的替换为

.mapAsync(parallelism) { consumerRecord =>
  askUpdate(consumeRecord.value().asInstanceOf[MyAvro])
}

mapAsync将“飞行中”期货限制为parallelism. 如果有parallelism期货(当然是由它产生的),它会背压。如果生成的未来以失败告终(对于请求本身,这通常是超时),它将失败;成功期货的结果(尊重传入顺序)将被传递(通常,这些将是akka.Done,特别是当流中唯一要做的事情是偏移提交和 时Sink.ignore)。

于 2020-12-03T17:47:36.843 回答
0

这种说法是不正确的:

...据我所知,背压机制部分由接收器控制,但在此 Stream 配置中,我的接收器只是“Sink.ignore”。所以我的 Sink 正在为 Back Pressure 做任何事情。

Sinks 对于背压没有什么特别之处。作为流控制机制的背压将在流中有异步边界的任何地方自动使用。可能在其中,Sink但也可能在流中的其他任何地方。

在您的情况下,您正在连接您的流以与演员交谈。那是你的异步边界,但你这样做的方式是使用map和在你用来!与演员交谈的地图内。所以没有背压,因为:

  1. map不是异步操作符,它内部的任何调用都不能参与背压机制。所以从 Akka Stream 的角度来看,没有引入异步边界。
  2. !是一劳永逸,没有提供关于演员有多忙于执行任何背压的反馈。

就像 Levi 提到的,你可以做的是从交互tell变为ask交互,并让接收参与者在其工作完成时做出响应。然后你可以mapAsync像 Levi 描述的那样使用。map和之间的区别在于,mapAsync它的语义mapAsync只有在返回Future完成时才会向下游发出。即使parallelism为 1,背压仍然有效。如果你的 Kafka 记录的速度比你的 actor 可以处理的快,mapAsync它会在等待Future完成时向上游背压。在这种特殊情况下,我认为增加parallelism没有任何意义,因为所有这些消息都将添加到演员的收件箱中,因此您不会真正通过这样做来加速任何事情。如果交互是一个 REST 调用,那么它可以提高整体吞吐量。根据您的参与者处理消息的方式,增加parallelismformapAsync可能会导致吞吐量增加。value 有效地限制了在背压开始之前允许paralleslism的最大未完成数。Future

于 2020-12-04T12:14:29.107 回答