2

有一个消息到达的 Kafka 主题。我需要阅读一条消息,对其进行处理并继续处理下一条消息。消息处理可能会失败,如果发生这种情况,则必须重试处理几次(比如说 10 次),然后才能继续处理下一条消息。如果处理失败 10 次,则需要丢弃该消息,我们应该继续处理下一条消息。

我们使用reactor-kafka,所有的处理都需要是反应式的。

这是我试图解决这个问题的方法:

Flux.defer(receiver::receive)
        .concatMap(this::processRecord)
        .retryBackoff(10, ofMillis(500))
        .concatMap(record -> record.receiverOffset().commit())
        .subscribe();

(这里receiver是一个KafkaReceiver<String, String>)。

这适用于没有任何异常的情况,如果有异常,processRecord()则重试 10 次。这里的问题是,如果在 10 次允许的尝试后仍然失败,则偏移量不会被提交(当然),所以下次从 Kafka 读取相同的偏移量时,有效地,处理会永远卡在“错误”偏移量上.

我试图实现以下明显的想法:如果异常“传递得比retryBackoff()操作员更远”,则提交当前偏移量。要提交偏移量,我们需要一个ReceiverRecord,因此我将异常包装ExceptionWithRecord与当前记录一起添加:

// in processRecord()
.onErrorMap(ex -> new ExceptionWithRecord(record, ex))

Flux.defer(receiver::receive)
        .concatMap(this::processRecord)
        .retryBackoff(10, ofMillis(500))
        .concatMap(record -> record.receiverOffset().commit())
        .onErrorResume(this::extractRecordAndMaybeCommit)
        .subscribe();

extractRecordAndMaybeCommit()从给定的异常中提取ReceiverRecord并提交它:

return record.receiverOffset().commit();

如果重试次数用尽,这种传递记录并稍后提交记录的方法有效,并且该.commit()方法被调用。但它没有效果。

事实证明,当任何异常进入上面的反应管道时,DefaultKafkaReceiver.dispose()都会被调用,因此任何后续的提交尝试都会被忽略。因此事实证明,一旦发布者“看到”任何异常,就根本不可能提交偏移量。

如何在仍在使用的同时实现“在 N 个错误后提交”行为reactor-kafka

4

1 回答 1

2

我无法找到解决任务的“正确”且简单的方法,因此我不得不求助于状态和副作用的蛮力:手动计算重试次数,并在尝试次数超过限制时停止重试。

这是计数器:

public class RetryCounter {
    private final Map<TopicPartition, OffsetAttempts> partitionOffsets = new ConcurrentHashMap<>();

    public void onRecord(PartitionOffset partitionOffset) {
        var offsetAttempts = offsetAttemptsFor(partitionOffset);
        offsetAttempts.increaseAttemptNumber(partitionOffset.offset());
        offsetAttempts.pruneTooAncientFor(partitionOffset.offset());
    }

    public long currentAttemptFor(PartitionOffset partitionOffset) {
        var offsetAttempts = offsetAttemptsFor(partitionOffset);
        long result = offsetAttempts.currentAttemptFor(partitionOffset.offset());

        return result;
    }

    private OffsetAttempts offsetAttemptsFor(PartitionOffset partitionOffset) {
        return partitionOffsets.computeIfAbsent(partitionOffset.topicPartition(), key -> new OffsetAttempts());
    }

    private static class OffsetAttempts {
        private final NavigableMap<Long, Long> offsetAttempts = new ConcurrentSkipListMap<>();

        // this must exceed your Kafka batch size
        private static final int ANTIQUITY_SPREAD_THRESHOLD = 10000;

        public void increaseAttemptNumber(long offset) {
            offsetAttempts.merge(offset, 0L, (oldValue, value) -> oldValue + 1);
        }

        public long currentAttemptFor(long offset) {
            return offsetAttempts.getOrDefault(offset, 0L);
        }

        @Override
        public String toString() {
            return offsetAttempts.toString();
        }

        public void pruneTooAncientFor(long offset) {
            long antiquityThreshold = offset - ANTIQUITY_SPREAD_THRESHOLD;

            offsetAttempts.headMap(antiquityThreshold).clear();
        }
    }
}

然后我们计算每个偏移量的重试次数(对于每个分区独立)并在超过重试次数时停止处理:

RetryCounter counter = new RetryCounter();
Flux.defer(receiver::receive)
        .concatMap(record -> {
            counter.onRecord(record);
            if (counter.currentAttemptFor(record) >= 10) {
                // we tried 10 times, it's 11th, so let's log the error and return
                // to avoid calling processRecord() so that there is no error
                // in the reactive pipeline and we are able to commit
                logFinalError(record);
                return Mono.just(record).flatMap(this::commitRecord);
            } else {
                return processRecord(record).thenReturn(record).flatMap(this::commitRecord);
            }
        })
        .retryBackoff(Long.MAX_VALUE, ofMillis(500))
        .subscribe();
于 2020-04-16T17:30:53.300 回答