我正在为一个简单的 Spring Boot 应用程序编写一个 Kafka 集成测试。应用程序简单地发布到 Kafka 主题。
我正在使用嵌入式 Kafka 实例进行测试。当通过 Intellij 运行时,测试工作得很好,但当我通过 gradle 运行它时失败。看起来latch
倒计时永远不会达到 0,并且测试最终会超时。
生产者配置:
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrap-address}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> articleProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> articleKafkaTemplate() {
return new KafkaTemplate<>(articleProducerFactory());
}
}
制片人:
public class KafkaProducer {
@Value(value = "kafka.topic-name")
String topicName;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message, String topic) throws KafkaPublishException {
try {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, message);
future.get();
} catch (Exception e) {
throw new KafkaPublishException(e.getMessage());
}
}
public String getTopicName() {
return topicName;
}
消费者:
@Component
public class KafkaConsumerHelper {
private CountDownLatch latch = new CountDownLatch(1);
private String payload = null;
@KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
setPayload(consumerRecord.toString());
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
public String getPayload() {
return payload;
}
private void setPayload(String payload) {
this.payload = payload;
}
}
测试:
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class KafkaProducerTest {
@Autowired
private KafkaProducer producer;
@Autowired
private KafkaConsumerHelper consumer;
@Value("${test.topic}")
private String topic;
@Test
public void shouldSuccessfullyPublishAnArticleMessageToEmbeddedKafka()
throws Exception {
String message = createArticle();
producer.sendMessage(message, topic);
consumer.getLatch().await();
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString(message));
}
应用程序.yml:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
auto-offset-reset: earliest
group-id: my-id
test:
topic: embedded-test-topic
partitions-number: 1
replication-factor: 1
知道是什么问题吗?