1

我正在尝试为 akka 流中的偏移量制定至少一次提交策略,但我无法理解在我的流上使用过滤器的情况下的预期模式是什么。

我的期望是,过滤后的消息都不会得到它们的偏移量,因此它们最终会陷入无限循环的处理中。

一个说明这一点的荒谬示例是过滤所有消息,如下所示:

Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.filter(_ => false)
.mapAsync(3)(_.committableOffset.commitScaladsl()) 
.runWith(Sink.ignore)

我只能看到将过滤器包装在流程中的解决方案,以检查逻辑是否会在这种情况下过滤掉并提交,但这似乎并不优雅,并且降低了过滤器形状的价值。

过滤并不是一件罕见的事情,但我看不到任何优雅的提交偏移量的方式?对我来说,框架没有办法做到这一点似乎很奇怪,所以我错过了什么?

4

1 回答 1

1

我无法使用当前的 akka 实现找到更智能的索引提交的解决方案,因此我将责任委托给 kafka 在 kafka 级别设置自动提交,并将其与应用程序的优雅关闭策略相结合因此,当蓝/绿部署发生时,所有消息都会在应用程序关闭之前处理。

  • 自动提交为真:
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("group1")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
  • 优雅关机:
val actorMaterializer = ActorMaterializer(
  ActorMaterializerSettings(system)
scala.sys.addShutdownHook {
        actorMaterializer.system.terminate()
        Await.result(actorMaterializer.system.whenTerminated, 30.seconds)
}
于 2018-03-19T17:04:42.483 回答