1

我怎么能去“@Override public void onFailure(Throwable ex) { ... }”?

它总是去“@Override public void onSuccess(SendResult<String, KafkaPresponseDto> result) {...}。

我想打印 log.error("Unable to send message=["+kafkaPgResponseDto.toString()+"] due to : " + ex.getMessage());

请帮忙...

ListenableFuture<SendResult<String, KafkaPgResponseDto>> future = pgResponseKafkaTemplate.send(kurlyTopicNamePgResponse, kafkaPgResponseDto);
        future.addCallback(new ListenableFutureCallback<SendResult<String, KafkaPgResponseDto>>(){
            @Override
            public void onSuccess(SendResult<String, KafkaPgResponseDto> result) {
                KafkaPgResponseDto kafkaPgResponseDto = result.getProducerRecord().value();
                log.debug("Send message=["+kafkaPgResponseDto.toString()+"] with offset=["+result.getRecordMetadata().offset()+"]");
            }
            @Override
            public void onFailure(Throwable ex) {
                log.error("Unable to send message=["+kafkaPgResponseDto.toString()+"] due to : "+ex.getMessage());
                kafkaTransactionService.failedProcessingKafkaHistorySave(orderNo, kurlyTopicNamePgResponse, gson.toJson(payload), ex.toString());
            }
        });
4

1 回答 1

1

我相信真正的 Kafka 不需要测试你的功能。考虑使用MockProducerfor 注入KafkaTemplate并模拟这种onFailure()情况下的错误:https ://www.baeldung.com/kafka-mockproducer

于 2021-11-12T14:01:16.910 回答