我有以下代码将数据发送到 Kafka。我能够成功发送测试 OnSucess 回调,但我无法测试 onFailure() 方法
我收到以下错误
java.lang.ClassCastException: org.mockito.codegen.Throwable$MockitoMock$2137573915 不能转换为 org.springframework.kafka.core.KafkaProducerException
@Test
void test() throws InterruptedException, ExecutionException {
Throwable ex = mock(Throwable.class);
Employee employee = new Employee();
when(kafkaTemplate.send(null,employee )).thenReturn(responseFuture);
when(sendResult.getProducerRecord()).thenReturn(producerRecord);
when(producerRecord.value()).thenReturn(employee);
doAnswer(invocationOnMock -> {
ListenableFutureCallback<SendResult<String, Employee>> listenableFutureCallback = invocationOnMock.getArgument(0);
listenableFutureCallback.onFailure(ex);
return null;
}).when(responseFuture).addCallback(any(ListenableFutureCallback.class));
kafkaSender.sendMessage(employee);
}
@Service
public class KafkaSender{
@Autowired
private KafkaTemplate<String, Employee> kafkaTemplate;
public void sendMessage(Employee employee) {
ObjectMapper objectMapper = new ObjectMapper();
ListenableFuture<SendResult<String, Employee>> listenableFuture = kafkaTemplate.send(topic,employee);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Employee>>() {
@Override
public void onSuccess(SendResult<String, Employee> result) {
saveInDatabaseMethod(result.getProducerRecord()); // method to save in DB
}
@Override
public void onFailure(Throwable ex) {
// class cast exception occur here
ProducerRecord<String, Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();
saveInDatabaseMethod(producerRecord);
}
}