0

我有一个使用@KafkaListener注释的 Kafka 消费者。要使用嵌入式 Kafka 服务器编写测试用例,我遵循此处的代码:如何为 @KafkaListener 编写单元测试?. 示例代码中建议的方法不起作用。我的代码中唯一的区别是该值不是字符串。所以我相应地配置了 producerFactory JsonSerializer

@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("test")
@DirtiesContext
public class SpringKafkaReceiverTest {

    private static String RECEIVER_TOPIC="messageRouterTopic";
    
    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, RECEIVER_TOPIC);
    
    @Mock
    Processor processor;
    
    @Autowired
    private KafkaListenerEndpointRegistry registry;
    
    private KafkaTemplate<String, Envelope> template;
    
    
    @Bean
    private ProducerFactory<String, Envelope> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getEmbeddedKafka().getBrokersAsString());
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
    
    @Before
    public void setup() {
        
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
        template = new KafkaTemplate<>(producerFactory());
        MockitoAnnotations.initMocks(this);
    }
    
    @Test
    public void test() throws Exception {
        ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry.getListenerContainer(RECEIVER_TOPIC);
        
        container.stop();
        
        @SuppressWarnings("unchecked")
        AcknowledgingConsumerAwareMessageListener<String, Envelope> messageListener = (AcknowledgingConsumerAwareMessageListener<String, Envelope>) container
                .getContainerProperties().getMessageListener();
        
        
        CountDownLatch latch = new CountDownLatch(1);
    
        container.getContainerProperties().setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, Envelope>(){

            @Override
            public void onMessage(ConsumerRecord<String, Envelope> data, Acknowledgment acknowledgment,
                    Consumer<?, ?> consumer) {
                messageListener.onMessage(data, acknowledgment, consumer);
                latch.countDown();
            }
            
        });
        
        container.start();
        Envelope message = new Envelope();
        message.setAction("sampleAction");
        message.setValue("This is the object for testing purpose");

        // These are the void methods
        // Since these methods will be invoked when the listener method listens to the Kafka topic, it is not explicitly called here.
         
        doNothing().when(processor).setCommand(any());
        doNothing().when(processor).allocateEnvelopes(any());

      
        template.send(RECEIVER_TOPIC, message);
        verify(processor, Mockito.times(1)).allocateEnvelopes(any());
        //assertTrue(latch.await(10, TimeUnit.SECONDS));
    }
}

引导应用程序的类是:

@SpringBootApplication
public class PostOfficeApplication {

    private static ApplicationContext appContext;
    
    public static void main(String[] args) {
        appContext = SpringApplication.run(PostOfficeApplication.class, args);
    }

    
    public static ApplicationContext getApplicationContext() {
        return appContext;
    }
    
    
}

Kafka监听器代码如下:

    @Bean
    public ConsumerFactory<String, Envelope> consumerFactory(List<String> consumerBootstrapServers) {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "postOfficeGrp");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
                new JsonDeserializer<>(Envelope.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Envelope> kafkaListenerFactory(ConsumerFactory<String, Envelope> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, Envelope> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setErrorHandler(new SeekToCurrentErrorHandler());
        return factory;
    }

@Service
public class Mailbox {

    @Autowired
    Processor processor;
            
    @KafkaListener(id="messageRouterTopic" , topics = "${app.topic}", groupId = "postOfficeGrp", containerFactory = "kafkaListenerFactory")
    public void processKafkaMessages(Envelope object) {

    processor.setCommand(PostOfficeApplication.getApplicationContext().getBean(object.getAction(), Command.class));
    processor.allocateEnvelopes(object);
        
    }
}

当我运行时mvn test,我在控制台中看到如下错误:

[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 6.975 s <<< FAILURE! - in com.walmartlabs.gioda.po.consumer.SpringKafkaReceiverTest
[ERROR] test  Time elapsed: 0.246 s  <<< ERROR!
java.lang.NullPointerException
    at SpringKafkaReceiverTest.test(SpringKafkaReceiverTest.java:84)

[INFO] 
[INFO] Results:
[INFO] 
[ERROR] Errors: 
[ERROR]   SpringKafkaReceiverTest.test:84 NullPointer
[INFO] 
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0
[INFO] 
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  9.286 s
[INFO] Finished at: 2020-08-07T10:07:09-05:00
[INFO] ------------------------------------------------------------------------

每次我运行代码时,container.stop()都会因 NullPointerException 而失败。我该如何解决这个问题?我错过了什么吗?

4

0 回答 0