0

所以我需要为我的 kafkalistener 方法创建一个集成测试,其中测试期望 ListenerExecutionFailedException 实际上被抛出,因为消息在消费期间由于另一个服务处于非活动状态而失败。

下面是测试代码,我使用 Embeddedkafkabroker 作为生产者和消费者:

@Test(expected = ListenerExecutionFailedException.class)
public void shouldThrowException() {

    RecordHeaders recordHeaders = new RecordHeaders();
    recordHeaders.add(new RecordHeader("messageType", "bootstrap".getBytes()));
    recordHeaders.add(new RecordHeader("userId", "123".getBytes()));
    recordHeaders.add(new RecordHeader("applicationId", "1234".getBytes()));
    recordHeaders.add(new RecordHeader("correlationId", UUID.randomUUID().toString().getBytes()));

    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
        "TEST_TOPIC",
        1,
        null,
        "message",
        "",
        recordHeaders);
    producer.send(producerRecord);


    consumer.subscribe(Collections.singleton("TEST_TOPIC"));

    consumer.poll(Duration.ofSeconds(2));

}

我想知道的是异常被认为没有被抛出并且测试失败,即使我知道消息确实被侦听器接收并且异常被抛出,因为我在日志上看到它们。

即使我将预期更改为 Throwable 似乎也没有检测到异常。

我应该怎么做才能让Junit检测到异常?

另外,另一个有趣的事情是我试图模拟在侦听器中调用的服务类并返回一些虚拟值,但是当我使用 Mockito.verify 时没有调用该服务

4

1 回答 1

0

你好像有什么误会。

producer.send

consumer.poll

您直接调用 kafka-clients 并且在此测试中根本没有使用 Spring。

ListenerExecutionFailedException是 Spring 的侦听器容器包装了消息侦听器抛出的用户异常的异常。

于 2020-03-19T14:15:49.840 回答