1

我正在尝试使用 reactor-kafka 处理来自 Kafka 的批量记录,如下所示:

 KafkaReceiver.create(
        kafkaProperties.createReceiverOptions<String, String>(String::class.java).subscription(listOf("test_topic1"))
    )
        .receiveExactlyOnce(kafkaProducer.transactionManager())
        .concatMap {
            it.flatMap { record ->
                log.info("start handling $record")
                Thread.sleep(200)
                log.info("finish handling $record")
                Mono.just("OK")
            }
                .then(kafkaProducer.transactionManager().commit<String>())
        }
        .subscribe()

使用 max.poll.records = 5。它有效,我可以在日志中看到:

13:46:50.303 [reactive-kafka-defaultConsumerGroup-1] DEBUG r.k.r.internals.ConsumerEventLoop - Emitting 5 records, requested now 1
13:46:50.310 [reactive-kafka-defaultConsumerGroup-1] DEBUG r.k.r.internals.ConsumerEventLoop - Paused - back pressure
...
13:46:51.517 [b851635e-caec-46bf-a195-0b05e18158b6-1] INFO  r.n.r.k.r.KafkaReceiveOnceFlowTest - start handling ConsumerRecord
13:46:51.727 [b851635e-caec-46bf-a195-0b05e18158b6-1] INFO  r.n.r.k.r.KafkaReceiveOnceFlowTest - finish handling ConsumerRecord
... ( start - finish 4 more times )
13:46:52.817 [reactive-kafka-defaultConsumerGroup-1] DEBUG r.k.r.internals.ConsumerEventLoop - Resumed
13:46:52.818 [reactive-kafka-defaultConsumerGroup-1] DEBUG r.k.r.internals.ConsumerEventLoop - Emitting 5 records, requested now 1
13:46:52.818 [reactive-kafka-defaultConsumerGroup-1] DEBUG r.k.r.internals.ConsumerEventLoop - Paused - back pressure
13:46:52.818 [b851635e-caec-46bf-a195-0b05e18158b6-1] DEBUG r.k.r.internals.ConsumerEventLoop - onRequest.toAdd 1
13:46:52.818 [b851635e-caec-46bf-a195-0b05e18158b6-1] INFO  r.n.r.k.r.KafkaReceiveOnceFlowTest - start handling ConsumerRecord
... ( etc. )

即每批处理完后执行poll

出现问题时,当我尝试并行处理记录时,将publishOn添加到上面的代码中:

...
.receiveExactlyOnce(kafkaProducer.transactionManager())
.concatMap {
    it.publishOn(Schedulers.boundedElastic())
      .flatMap { record ->
            log.info("start handling $record")
            Thread.sleep(200)
            log.info("finish handling $record")
            Mono.just("OK")
        }
            .then(kafkaProducer.transactionManager().commit<String>())
    }
...

在这种情况下,看起来 poll 是独立执行的:

....
14:03:44.809 [reactive-kafka-defaultConsumerGroup-1] DEBUG r.k.r.internals.ConsumerEventLoop - Emitting 5 records, requested now 1
14:03:44.816 [reactive-kafka-defaultConsumerGroup-1] DEBUG r.k.r.internals.ConsumerEventLoop - Paused - back pressure
14:03:46.072 [boundedElastic-1] INFO  r.n.r.k.r.KafkaReceiveOnceFlowTest - start handling ConsumerRecord(topic = test_topic1, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1643627020072, serialized key size = 17, serialized value size = 19, headers = RecordHeaders(headers = [], isReadOnly = false), key = key-1643627020072, value = [B@37211011)
14:03:46.099 [kafka-producer-network-thread | producer-755eb08a-4bd3-4914-b22d-cd39f15c5e5e] INFO  o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-755eb08a-4bd3-4914-b22d-cd39f15c5e5e, transactionalId=755eb08a-4bd3-4914-b22d-cd39f15c5e5e] Discovered group coordinator localhost:9092 (id: 1001 rack: null)
14:03:46.350 [reactive-kafka-defaultConsumerGroup-1] DEBUG r.k.r.internals.ConsumerEventLoop - Resumed
14:03:46.351 [reactive-kafka-defaultConsumerGroup-1] DEBUG r.k.r.internals.ConsumerEventLoop - Emitting 5 records, requested now 1
14:03:46.351 [reactive-kafka-defaultConsumerGroup-1] DEBUG r.k.r.internals.ConsumerEventLoop - Paused - back pressure
14:03:46.352 [755eb08a-4bd3-4914-b22d-cd39f15c5e5e-1] DEBUG r.k.r.internals.ConsumerEventLoop - onRequest.toAdd 1
14:03:46.462 [reactive-kafka-defaultConsumerGroup-1] DEBUG r.k.r.internals.ConsumerEventLoop - Resumed
14:03:46.463 [reactive-kafka-defaultConsumerGroup-1] DEBUG r.k.r.internals.ConsumerEventLoop - Emitting 5 records, requested now 1
14:03:46.463 [755eb08a-4bd3-4914-b22d-cd39f15c5e5e-1] DEBUG r.k.r.internals.ConsumerEventLoop - onRequest.toAdd 1
14:03:46.463 [reactive-kafka-defaultConsumerGroup-1] DEBUG r.k.r.internals.ConsumerEventLoop - Paused - back pressure
14:03:46.570 [reactive-kafka-defaultConsumerGroup-1] DEBUG r.k.r.internals.ConsumerEventLoop - Resumed
14:03:46.585 [boundedElastic-1] INFO  r.n.r.k.r.KafkaReceiveOnceFlowTest - finish handling ConsumerRecord
14:03:46.585 [boundedElastic-1] INFO  r.n.r.k.r.KafkaReceiveOnceFlowTest - start handling ConsumerRecord
....

如您所见,在处理批处理之前轮询记录。理想的行为是仅在当前批次完全处理后轮询 Kafka 以获取下一个批次,但同时处理批次记录。我怎样才能做到这一点?

感谢任何帮助或提示

UPD。 经过一番激烈的斗争,我想我找到了解决办法:

....
    .receiveExactlyOnce(kafkaProducer.transactionManager())
    .concatMap ({
            it.collectList()
                .flatMap  { records ->
                    Flux.fromIterable(records)
                        .parallel()
                        .runOn(Schedulers.boundedElastic())
                        .flatMap { record ->
                                log.info("start handling $record")
                                Thread.sleep(1000)
                                log.info("finish handling $record")
                                Mono.just("OK")
                        }
                        .sequential()
                        .then()
                }
                .then(kafkaProducer.transactionManager().commit<String>())
        }, 0)
....

这完美地工作但是现在有下一个问题 - 如果我将receiveExactlyOnce转换为热发布者(即添加.publish(1)),会发生类似的错误,onNext 和批处理开始独立工作

更新 2如果您使用 prefetch = 1,则publish没有问题。但是您确实会得到一些“轮询转发”行为(我认为这是术语“预取”所假设的),但是轮询率实际上是由批处理速度控制的。

问题解决了。

4

0 回答 0