我们有一个 Akka 应用程序,它使用 Kafka 主题并将接收到的消息发送到 Akka Actor。我不确定我的编程方式是否使用了 Akka Streams 中内置的背压机制的所有好处。
以下是我的配置...
val control : Consumer.DrainingControl[Done]
Consumer
.sourceWitOffsetContext(consumerSettings, Subscriptions.topics("myTopic"))
.map(consumerRecord =>
val myAvro = consumerRecord.value().asInstanceOf[MyAvro]
val myActor = AkkaSystem.sharding.getMyActor(myAvro.getId)
myActor ! Update(myAvro)
)
.via(Commiter.flowWithOffsetContext(CommitterSettings(AkkaSystem.system.toClassic)))
.toMat(Sink.ignore)(Consumer.DrainControl.apply)
.run()
这符合我对业务案例的期望,myActor 收到命令更新(MyAvro)
我对背压的技术概念更加恼火,据我所知,背压机制部分由接收器控制,但在这个流配置中,我的接收器只是“Sink.ignore”。所以我的 Sink 正在为 Back Pressure 做任何事情。
当 Akka Kafka Stream 提交 Kafka Topic 偏移量时,我还很好奇什么?命令发送到 MyActor 邮箱的那一刻?如果是这样,那么我如何处理诸如询问模式之类的场景,Kafka Offset 不应该在询问模式完成之前提交。
我看到一些处理手动偏移控制“plainPartitionedManualOffsetSource”、“commitablePartitionManualOffsetSource”的工厂方法,但我找不到任何示例,我可以用我的业务逻辑决定手动提交偏移吗?
作为替代配置,我可以使用类似的东西。
val myActor: ActorRef[MyActor.Command] = AkkaSystem.sharding.getMyActor
val (control, result) =
Consumer
.plainSource(consumerSettings, Subscriptions.topics("myTopic"))
.toMat(Sink.actorRef(AkkaSystem.sharding.getMyActor(myAvro.getId), null))(Keep.both)
.run()
现在我可以访问 Sink.actorRef,我认为 Back Pressure 机制有机会控制 Back Pressure,自然这段代码将不起作用,因为我不知道如何在这个星座下访问“myAvro”。
谢谢回答..