1

我正在为一个简单的 Spring Boot 应用程序编写一个 Kafka 集成测试。应用程序简单地发布到 Kafka 主题。

我正在使用嵌入式 Kafka 实例进行测试。当通过 Intellij 运行时,测试工作得很好,但当我通过 gradle 运行它时失败。看起来latch倒计时永远不会达到 0,并且测试最终会超时。

生产者配置:

public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrap-address}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, String> articleProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> articleKafkaTemplate() {
        return new KafkaTemplate<>(articleProducerFactory());
    }
}

制片人:

public class KafkaProducer {

    @Value(value = "kafka.topic-name")
    String topicName;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message, String topic) throws KafkaPublishException {
        try {
            ListenableFuture<SendResult<String, String>> future =
                    kafkaTemplate.send(topic, message);
           future.get();
        } catch (Exception e) {
            throw new KafkaPublishException(e.getMessage());
        }

    }

    public String getTopicName() {
        return topicName;
    }

消费者:

@Component
public class KafkaConsumerHelper {
    private CountDownLatch latch = new CountDownLatch(1);
    private String payload = null;

    @KafkaListener(topics = "${test.topic}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {

        setPayload(consumerRecord.toString());
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public String getPayload() {
        return payload;
    }

    private void setPayload(String payload) {
        this.payload = payload;
    }
}

测试:

@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class KafkaProducerTest {

    @Autowired
    private KafkaProducer producer;

    @Autowired
    private KafkaConsumerHelper consumer;

    @Value("${test.topic}")
    private String topic;


    @Test
    public void shouldSuccessfullyPublishAnArticleMessageToEmbeddedKafka()
            throws Exception {

        String message = createArticle();

        producer.sendMessage(message, topic);
        consumer.getLatch().await();

        assertThat(consumer.getLatch().getCount(), equalTo(0L));
        assertThat(consumer.getPayload(), containsString(message));
    }

应用程序.yml:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      auto-offset-reset: earliest
      group-id: my-id
test:
  topic: embedded-test-topic
  partitions-number: 1
  replication-factor: 1

知道是什么问题吗?

4

1 回答 1

2

对于将来查看此问题的任何人,我的问题是我没有@EmbeddedKafka正确使用。

解决方法是将bootstrapServersProperty = "spring.kafka.bootstrap-servers"属性添加到@EmbeddedKafka注释中。

@EmbeddedKafka(partitions = 1, bootstrapServersProperty = "spring.kafka.bootstrap-servers")

Kafka 文档中的更多信息。

于 2021-09-03T07:48:27.447 回答