1

我有一个反应器 Kafka 项目,它使用来自 Kafka 主题的消息,转换消息,然后写入另一个主题。

public Flux<String> consume(String destTopic) {
        return kafkaConsumerTemplate
                .receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .doOnNext(s-> sendToKafka(s,destTopic))
                .map(ConsumerRecord::value)
                .doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
    }

我的理解是只有在反应器中成功完成所有序列步骤后才会提交偏移量。那是对的吗?我想确保不处理下一条记录,除非当前记录成功发送到目标 Kafka 主题。

4

2 回答 2

1

如果你想控制提交行为,你需要像这样禁用自动提交:

ReceiverOptions.create()
    .commitInterval(Duration.ZERO)             
    .commitBatchSize(0)

然后,您需要在处理记录后提交:

final ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create()
                .commitInterval(Duration.ZERO)
                .commitBatchSize(0)
                .subscription(List.of("mytopic"));
sender.send(KafkaReceiver.create(receiverOptions)
                        .receive()
                        .map(m -> SenderRecord.create(transform(m.key(), m.value()), m.receiverOffset()))) // transform the data
                .doOnNext(m -> m.correlationMetadata().commit().block()); // Synchronous commit after record is successfully delivered
于 2021-09-21T08:46:02.627 回答
1

实现是这样的:

@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
    return withHandler(AckMode.AUTO_ACK, (scheduler, handler) -> handler
        .receive()
        .filter(it -> !it.isEmpty())
        .publishOn(scheduler, preparePublishOnQueueSize(prefetch))
        .map(consumerRecords -> Flux.fromIterable(consumerRecords)
            .doAfterTerminate(() -> {
                for (ConsumerRecord<K, V> r : consumerRecords) {
                    handler.acknowledge(r);
                }
            })));
}

因此,每一个ConsumerRecords只有在其Flux被完全处理时才会被确认:成功或有错误。因此,它不是每条记录的提交。从技术上讲,无论如何它都不能是每条记录,因为我们只需要提交,我们的消费者应用程序就会失败,我们需要从之前留下的偏移量继续。当前活动KafkaConsumer的将光标保留在内存中,并且不在乎您是否提交。

如果您真的想要“每条记录”,请参阅ReactiveKafkaConsumerTemplate.receive()及其KafkaReceiver.receive()代表:

/**
 * Starts a Kafka consumer that consumes records from the subscriptions or partition
 * assignments configured for this receiver. Records are consumed from Kafka and delivered
 * on the returned Flux when requests are made on the Flux. The Kafka consumer is closed
 * when the returned Flux terminates.
 * <p>
 * Every record must be acknowledged using {@link ReceiverOffset#acknowledge()} in order
 * to commit the offset corresponding to the record. Acknowledged records are committed
 * based on the configured commit interval and commit batch size in {@link ReceiverOptions}.
 * Records may also be committed manually using {@link ReceiverOffset#commit()}.
 *
 * @return Flux of inbound receiver records that are committed only after acknowledgement
 */
default Flux<ReceiverRecord<K, V>> receive() {
于 2021-09-21T13:42:50.483 回答