我有这个简单的 Spring Boot 应用程序供您考虑:
@SpringBootApplication
public class SpringBootEmbeddedKafkaApplication {
public static final String MY_TOPIC = "myTopic";
public BlockingQueue<String> kafkaMessages = new LinkedBlockingQueue<>();
public static void main(String[] args) {
SpringApplication.run(SpringBootEmbeddedKafkaApplication.class, args);
}
@KafkaListener(topics = MY_TOPIC)
public void listener(String payload) {
this.kafkaMessages.add(payload);
}
}
application.properties
: _
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
并测试:
@RunWith(SpringRunner.class)
@SpringBootTest(properties =
"spring.kafka.bootstrapServers:${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
@EmbeddedKafka(topics = SpringBootEmbeddedKafkaApplication.MY_TOPIC)
public class SpringBootEmbeddedKafkaApplicationTests {
@Autowired
private KafkaTemplate<Object, String> kafkaTemplate;
@Autowired
private SpringBootEmbeddedKafkaApplication kafkaApplication;
@Test
public void testListenerWithEmbeddedKafka() throws InterruptedException {
String testMessage = "foo";
this.kafkaTemplate.send(SpringBootEmbeddedKafkaApplication.MY_TOPIC, testMessage);
assertThat(this.kafkaApplication.kafkaMessages.poll(10, TimeUnit.SECONDS)).isEqualTo(testMessage);
}
}
注意spring.kafka.consumer.auto-offset-reset=earliest
让消费者从分区的开头读取。
在测试中应用的另一个重要选项是:
@SpringBootTest(properties =
"spring.kafka.bootstrapServers:${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
@EmbeddedKafka
填充spring.embedded.kafka.brokers
系统属性并使 Spring Boot 自动配置知道我们需要将其值复制到配置spring.kafka.bootstrapServers
属性。
或者根据我们的文档的另一种选择:
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}