我试图查看当消费者收到来自 kafka 主题的消息但测试没有通过并且消费者甚至没有收到消息时是否调用了服务。
我的测试:
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class ConsumerTest {
@Autowired
Consumer consumer;
@Mock
private Service service;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived()
throws Exception {
String message = "hi";
kafkaTemplate.send("topic", message);
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
verify(service, times(1)).addMessage();
}
}
主包中的消费者是具有@KafkaListener(topics = "topic") 的普通消费者。然后我有一个配置文件:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new StringDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
而且在 application.properties (在测试包内)我把这个:
春天:kafka:消费者:自动偏移重置:最早的组ID:组