0

如何对实现 spring-kafka MessageListener 接口的类进行单元测试?我有一个监听器类,我正在使用 onMessage 函数手动监听主题。这个函数很简单,只是接收消息。

我的设置是使用 Spring 5.8、Spring-Kafka 2.2.7、Spring-Kafka-Test、JUnit 和没有spring boot。

我一直在尝试来自 Spring 参考文档和其他帖子的一系列不同示例,但似乎没有一个显示一种简单的方法来测试实现 MessageListener 的 Listener 类。

我不确定是否需要设置 EmbeddedKafkaBroker 或 EmbeddedKafkaRule,或者是否有不同的测试方法。当我尝试使用 EmbeddedKafkaRule 时,我收到一条错误消息,显示 NoClassDefFound。

但是我不明白这个测试用例如何影响我的 onMessage 函数。

@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext
public class listenerTest {

    private String topic = "someTopic";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, topic);

    private CountDownLatch countDownLatch;

    @Before
    public void setUpTests (){
        Map<String, Object> sProps = KafkaTestUtils.senderProps(embeddedKafka.getEmbeddedKafka().getBrokersAsString());

        ProducerFactory producer = new DefaultKafkaProducerFactory<String, String> (sProps);

        kafkaTemplate = new KafkaTemplate<>(producer);

        kafkaTemplate.setDefaultTopic(topic);

        countDownLatch = new CountDownLatch (1);

    }

    @Test
    public void testReceiver(){
         kafkaTemplate.sendDefault("message");
         assertEquals(latch.getCount(), 0);
    }

我想单元测试的课程

public class listener implements BatchAcknowledgingMessageListener<String, String>{

    private CallbackInterface callback;

    public listener(CallbackInterface callback){
        this.callbackI = callback;
    }

    @Override
    public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack){
         this.callbackI.handleMessage();
         ack.acknowledge();
    }
}

这会引发一个奇怪的错误,上面写着这个...... NoClassDefFound

4

1 回答 1

2

对于纯单元测试,您不需要嵌入式代理,您应该直接调用侦听器。

注入一个模拟回调并验证它是否被正确调用。

当我尝试直接调用 onMessage 函数对其进行测试时,我收到一条错误消息,提示 Container 不应调用函数 onMessage。

你叫错了onMessage...

public interface BatchMessageListener extends MessageListener {

    @Override
    default void onMessage(Message message) {
        throw new UnsupportedOperationException("Should never be called by the container");
    }

    @Override
    void onMessageBatch(List<Message> messages);

}

编辑

public class MyListener implements BatchAcknowledgingMessageListener<String, String> {

    private final MyService service;

    public MyListener(MyService service) {
        this.service = service;
    }

    @Override
    public void onMessage(List<ConsumerRecord<String, String>> data, Acknowledgment acknowledgment) {
        data.forEach(dat -> this.service.call(dat.value()));
        acknowledgment.acknowledge();
    }

    public interface MyService {

        void call(String toCall);

    }

}

class So57192362ApplicationTests {

    @Test
    void test() {
        MyService service = mock(MyService.class);
        MyListener listener = new MyListener(service);
        Acknowledgment acknowledgment = mock(Acknowledgment.class);
        listener.onMessage(Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, null, "bar")), acknowledgment);
        verify(service).call("bar");
        verify(acknowledgment).acknowledge();
        verifyNoMoreInteractions(service, acknowledgment);
    }

}
于 2019-07-25T13:34:56.500 回答