我一直在尝试使用 reactive-kafka,但我在条件处理方面遇到了问题,对此我没有找到令人满意的答案。
基本上我正在尝试使用一个包含大量消息(每天大约 100 亿条消息)的 kafka 主题,并且根据消息的某些属性只处理其中的一些消息(每天几千条),然后将我的消息的处理版本推送到另一个主题,我正在努力正确地做到这一点。
我的第一次尝试是这样的:
// This is pseudo code.
Source(ProducerSettings(...))
.filter(isProcessable(_))
.map(process(_))
.via(Producer.flow(producerSettings))
.map(_.commitScalaDsl())
.runWith(Sink.ignore)
这种方法的问题在于,我只有在阅读我能够处理的消息时才会提交,这显然不是很酷,因为如果我必须停止并重新启动我的程序,那么我必须重新阅读一堆无用的消息,并且因为它们太多了,我不能那样做。
然后,我尝试通过以下方式使用 GraphDSL:
in ~> broadcast ~> isProcessable ~> process ~> producer ~> merge ~> commit
~> broadcast ~> isNotProcessable ~> merge
这个解决方案显然也不好,因为我无法处理的消息通过图的第二个分支并在可处理的消息真正推送到它们的目的地之前被提交,这比第一条消息更糟糕,因为它没有甚至保证至少一次交货。
有人知道我如何解决这个问题吗?