我有一个反应器 Kafka 项目,它使用来自 Kafka 主题的消息,转换消息,然后写入另一个主题。
public Flux<String> consume(String destTopic) {
return kafkaConsumerTemplate
.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.doOnNext(s-> sendToKafka(s,destTopic))
.map(ConsumerRecord::value)
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
我的理解是只有在反应器中成功完成所有序列步骤后才会提交偏移量。那是对的吗?我想确保不处理下一条记录,除非当前记录成功发送到目标 Kafka 主题。