1

我有一个从 kafka 消费并将数据存储到数据库的服务。简化逻辑如下:

Flux<ReceiverRecord<String, byte[]>> kafkaFlux = KafkaReceiver.create(options).receive();
kafkaFlux.flatMap(r -> store(r))// IO operation, store to database
    .subscribe(record -> record.receiverOffset().acknowledge()); // Ack the record

使flatMap通量无序。根据反应堆 kafka 文档,acknowledge()可以确认尚未存储到数据库的记录: https ://projectreactor.io/docs/kafka/snapshot/api/reactor/kafka/receiver/ReceiverOffset.html

确认与此偏移关联的 ReceiverRecord。偏移量将根据提交配置参数 ReceiverOptions.commitInterval() 和 ReceiverOptions.commitBatchSize() 自动提交。当一个偏移量被确认时,假定该分区中直到并包括该偏移量的所有记录都已被处理。当接收者 Flux 终止时,如果可能,所有已确认的偏移量都将被提交。

如何保证至少一次但不阻塞流?

4

0 回答 0