0

我一直在尝试使用 reactive-kafka,但我在条件处理方面遇到了问题,对此我没有找到令人满意的答案。

基本上我正在尝试使用一个包含大量消息(每天大约 100 亿条消息)的 kafka 主题,并且根据消息的某些属性只处理其中的一些消息(每天几千条),然后将我的消息的处理版本推送到另一个主题,我正在努力正确地做到这一点。

我的第一次尝试是这样的:

// This is pseudo code.
Source(ProducerSettings(...))
    .filter(isProcessable(_))
    .map(process(_))
    .via(Producer.flow(producerSettings))
    .map(_.commitScalaDsl())
    .runWith(Sink.ignore)

这种方法的问题在于,我只有在阅读我能够处理的消息时才会提交,这显然不是很酷,因为如果我必须停止并重新启动我的程序,那么我必须重新阅读一堆无用的消息,并且因为它们太多了,我不能那样做。

然后,我尝试通过以下方式使用 GraphDSL:

in ~> broadcast ~> isProcessable    ~> process ~> producer ~> merge ~> commit
   ~> broadcast ~>              isNotProcessable           ~> merge

这个解决方案显然也不好,因为我无法处理的消息通过图的第二个分支并在可处理的消息真正推送到它们的目的地之前被提交,这比第一条消息更糟糕,因为它没有甚至保证至少一次交货。

有人知道我如何解决这个问题吗?

4

1 回答 1

1

A way I used to solve a similar problem before was to exploit sequence number to guarantee ordering.

For instance you could build a flow like the one you describe save the commit:

in ~> broadcast ~> isProcessable ~> process ~> producer ~> merge ~> out
   ~> broadcast ~>            isNotProcessable          ~> merge

And then wrap it into a order preserving flow like this one (taken from a library we developed in my company): OrderPreservingFlow. The resulting flow could then be sent to the committer sink.

If your processing stage guarantee ordering you can even be more efficient and avoid any buffering by embedding the logic directly in your graph:

in ~> injectSeqNr ~> broadcast ~> isProcessable ~> process ~> producer ~> mergeNextSeqNr ~> commit
                  ~> broadcast ~>             isNotProcessable         ~> mergeNextSeqNr

Here your mergeNextSeqNr is just a modified merge stage where if input is available on port 1, you emit it immediately if its sequence number is the expected one, otherwise you just wait for data to be available on the other port.

The end result should be exactly the same as using the flow wrapping above, but you might more easily adapt it to your needs if you embed it.

于 2018-02-16T13:55:35.247 回答