我有同样的问题并通过创建以下类来解决它
@Primary
@Service
class MyCustomConsumerForTest(
// Services
) : MyConsumer(// Services) {
val latch = CountDownLatch(1)
override fun listen(message: String) {
super.listen(message)
latch.countDown()
}
}
和我的消费者
@Service
class MyConsumer(
private val service
) {
@KafkaListener(
topics = ["topic"])
fun listen(message: String) {
//my process
service.somefunction(foo)
.......
}
}
和我的测试
@EnableKafka
@SpringBootTest(classes = [MyCustomConsumerForTest::class,
KafkaConfig::class])
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
brokerProperties = [
"listeners=PLAINTEXT://localhost:9092",
"port=9092"
])
@ActiveProfiles("test")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class CampaignDataValidatorIntegrationTest {
private val TOPIC_KAFKA = "topic"
@Autowired
private lateinit var embeddedKafkaBroker: EmbeddedKafkaBroker
@Autowired
private lateinit var listener: MyCustomConsumerForTest
private lateinit var container: KafkaMessageListenerContainer<String, String>
private lateinit var records: BlockingQueue<ConsumerRecord<String, String>>
@SpyBean
private lateinit var service: Service
@BeforeAll
fun setUp() {
val configs = HashMap(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker))
val consumerFactory = DefaultKafkaConsumerFactory(configs, StringDeserializer(), StringDeserializer())
val containerProperties = ContainerProperties(TOPIC_KAFKA)
container = KafkaMessageListenerContainer(consumerFactory, containerProperties)
records = LinkedBlockingQueue()
container.setupMessageListener(MessageListener<String, String> { records.add(it) })
container.start()
embeddedKafkaBroker.partitionsPerTopic.let { ContainerTestUtils.waitForAssignment(container, it) }
}
@AfterAll
fun tearDown() {
logger.info("Stop Listener")
container.stop()
}
@Test
fun kafkaSetup_withTopic_ensureSendMessageIsReceived() {
// Arrange
val configs = HashMap(KafkaTestUtils.producerProps(embeddedKafkaBroker))
val producer = DefaultKafkaProducerFactory(configs, StringSerializer(), StringSerializer()).createProducer()
// Act
producer.send(ProducerRecord<String, String>(TOPIC_KAFKA, file))
producer.flush()
// Assert
val singleRecord = records.poll(1, TimeUnit.MILLISECONDS)
listener.latch.await(1000, TimeUnit.MILLISECONDS)
assert(singleRecord != null)
verify(service, times(1)).validate(anyOrNull())
argumentCaptor<Foo>().apply {
verify(service, times(1)).somefunction(capture())
Assertions.assertEquals(1, allValues.size)
Assertions.assertEquals("text", firstValue.text)
}
}
}
和我的 kafkaconfig
@Configuration
@EnableKafka
class KafkaConfig {
@Value("\${kafka.bootstrap-servers}")
private lateinit var bootstrapAddress: String
@Value("\${kafka.consumer.group-id}")
private lateinit var groupId: String
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
val props = HashMap<String, Any>()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
props[ConsumerConfig.GROUP_ID_CONFIG] = groupId
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
return DefaultKafkaConsumerFactory(props)
}
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
factory.containerProperties.isMissingTopicsFatal = false
return factory
}
}