我正在尝试为 akka 流中的偏移量制定至少一次提交策略,但我无法理解在我的流上使用过滤器的情况下的预期模式是什么。
我的期望是,过滤后的消息都不会得到它们的偏移量,因此它们最终会陷入无限循环的处理中。
一个说明这一点的荒谬示例是过滤所有消息,如下所示:
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.filter(_ => false)
.mapAsync(3)(_.committableOffset.commitScaladsl())
.runWith(Sink.ignore)
我只能看到将过滤器包装在流程中的解决方案,以检查逻辑是否会在这种情况下过滤掉并提交,但这似乎并不优雅,并且降低了过滤器形状的价值。
过滤并不是一件罕见的事情,但我看不到任何优雅的提交偏移量的方式?对我来说,框架没有办法做到这一点似乎很奇怪,所以我错过了什么?