0

我有以下代码将数据发送到 Kafka。我能够成功发送测试 OnSucess 回调,但我无法测试 onFailure() 方法

我收到以下错误

java.lang.ClassCastException: org.mockito.codegen.Throwable$MockitoMock$2137573915 不能转换为 org.springframework.kafka.core.KafkaProducerException

@Test
    void test() throws InterruptedException, ExecutionException {

         Throwable ex = mock(Throwable.class);
                 Employee employee = new Employee();

        when(kafkaTemplate.send(null,employee )).thenReturn(responseFuture);
            when(sendResult.getProducerRecord()).thenReturn(producerRecord);
            when(producerRecord.value()).thenReturn(employee);

            doAnswer(invocationOnMock -> {
                ListenableFutureCallback<SendResult<String, Employee>> listenableFutureCallback = invocationOnMock.getArgument(0);
                
                listenableFutureCallback.onFailure(ex);
               
                return null;
            }).when(responseFuture).addCallback(any(ListenableFutureCallback.class));
            
            kafkaSender.sendMessage(employee);
            
         }

@Service
public class KafkaSender{
    @Autowired
    private KafkaTemplate<String, Employee> kafkaTemplate;
    public void sendMessage(Employee employee) {

        ObjectMapper objectMapper = new ObjectMapper();

        ListenableFuture<SendResult<String, Employee>> listenableFuture = kafkaTemplate.send(topic,employee);

        listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Employee>>() {

            @Override
            public void onSuccess(SendResult<String, Employee> result) {

                
                saveInDatabaseMethod(result.getProducerRecord()); // method to save in DB

            }

            @Override
            public void onFailure(Throwable ex) {

                                 // class cast exception occur here
                ProducerRecord<String, Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();


                saveInDatabaseMethod(producerRecord);

            }

            }
4

1 回答 1

1
ProducerRecord<String, Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();

您的模拟不是用 KPE 调用回调,而是用这个调用它

Throwable ex = mock(Throwable.class);

您需要将其包装在 KPE 中。

于 2021-03-10T14:28:14.500 回答