1

我正在开发一个应用程序,其中每个主题分区都有多个使用者,因此从主题中读取是并发的。我按照此链接确保在现有消费者停止时再次创建消费者。.repeat 将创建新的消费者。我一直在尝试测试这种情况:

下面是我的代码和测试:

@Bean
public ReceiverOptions<String, String> kafkaReceiverOptions(String topic, KafkaProperties kafkaProperties) {
    ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
    return basicReceiverOptions.subscription(Collections.singletonList(topic))
            .addAssignListener(receiverPartitions -> log.debug("onPartitionAssigned {}", receiverPartitions))
            .addRevokeListener(receiverPartitions -> log.debug("onPartitionsRevoked {}", receiverPartitions));
}

@Bean
public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
    return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
}

@Bean
public ReactiveKafkaProducerTemplate<String, List<Object>> kafkaProducerTemplate(
        KafkaProperties properties) {
    Map<String, Object> props = properties.buildProducerProperties();
    return new ReactiveKafkaProducerTemplate<String, List<Object>>(SenderOptions.create(props));
}


public void run(String... args) {

        for(int i = 0; i < topicPartitionsCount ; i++) {
            readWrite(destinationTopic).subscribe();
        }
    }}


public Flux<String> readWrite(String destTopic) {
        AtomicBoolean repeatConsumer = new AtomicBoolean(false);
        return kafkaConsumerTemplate
                .receiveAutoAck()
                .doOnNext(consumerRecord -> log.debug("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                //.doOnNext(consumerRecord -> log.info("Record received from partition {} in thread {}", consumerRecord.partition(),Thread.currentThread().getName()))
                .doOnNext(s-> sendToKafka(s,destinationTopic))
                .map(ConsumerRecord::value)
                .doOnNext(record -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), record))
                .doOnError(exception -> log.debug("Error occurred while processing the message, attempting retry. Error message: {}", exception.getMessage()))
                .retryWhen(Retry.backoff(Integer.parseInt(retryAttempts), Duration.ofSeconds(Integer.parseInt(retryAttemptsDelay))).transientErrors(true))
                .onErrorContinue((exception,errorConsumerRecord)->{
                    ReceiverRecordException recordException = (ReceiverRecordException)exception;
                    log.debug("Retries exhausted for : {}", recordException);
                    recordException.getRecord().receiverOffset().acknowledge();
                    repeatConsumer.set(true);
                })
                .repeat(repeatConsumer::get); // will create a new consumer if the existing consumer stops
    }


public class ReceiverRecordException extends RuntimeException {
    private final ReceiverRecord record;

    ReceiverRecordException(ReceiverRecord record, Throwable t) {
        super(t);
        this.record = record;
    }

    public ReceiverRecord getRecord() {
        return this.record;
    }
}

测试:

@Test
public void readWriteCreatesNewConsumerWhenCurrentConsumerStops() {
    AtomicInteger recordNumber = new AtomicInteger(0);
    Mockito
            .when(reactiveKafkaConsumerTemplate.receiveAutoAck())
            .thenReturn(
                    Flux.create(consumerRecordFluxSink -> {
                        if (recordNumber.getAndIncrement() < 5) {
                            consumerRecordFluxSink.error(new RuntimeException("Kafka down"));
                        } else {
                            consumerRecordFluxSink.next(createConsumerRecord(validMessage));
                            consumerRecordFluxSink.complete();
                        }
                    })
            );

    Flux<String> actual = service.readWrite();

    StepVerifier.create(actual)
            .verifyComplete();

}

当我运行测试时,我得到记录重试异常 - onError(reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3 in a row (3 total)))

我的理解是 onErrorContinue 将捕获异常然后继续下一条记录。但看起来它正在引发异常。由于它引发异常,因此 repeat() 如何工作?如果有人可以帮助我了解如何测试这种情况,我将不胜感激?

4

0 回答 0