1

我正在为我的 Spring Boot 应用程序编写 Junit 测试用例(使用 @EmbeddedKafka),该应用程序广泛使用 Spring-kafka 与其他服务和其他操作进行通信。

一个典型的案例是从 kafka 中删除数据(我们在 kafka 中推送消息)。

目前在 delete() 方法中,我们首先检查 kafka 中是否存在任何请求删除的消息。然后我们在 Kafka 中为该消息键推送null

为上述方法逻辑编写 Junit 的步骤。

@Test
public void test(){
   //Push a message to Kafka (id=1234)
   //call test method service.delete(1234);
       //internally service.delete(1234) checks/validate whether message exists in kafka and then push null to delete topic.
  //check delete topic for delete message received.
  // Assertions
}

这里的问题是 Kafka 总是抛出 message not found 异常。在 service.delete() 方法中。

在控制台中检查日志时。我发现我的生产者配置为 kafka 使用不同的端口,而消费者配置使用不同的端口。

我不确定我是否遗漏了一些细节,或者这种行为的原因是什么。任何帮助将不胜感激。

4

1 回答 1

1

我有这个简单的 Spring Boot 应用程序供您考虑:

@SpringBootApplication
public class SpringBootEmbeddedKafkaApplication {

    public static final String MY_TOPIC = "myTopic";

    public BlockingQueue<String> kafkaMessages = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        SpringApplication.run(SpringBootEmbeddedKafkaApplication.class, args);
    }

    @KafkaListener(topics = MY_TOPIC)
    public void listener(String payload) {
        this.kafkaMessages.add(payload);
    }

}

application.properties: _

spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest

并测试:

@RunWith(SpringRunner.class)
@SpringBootTest(properties =
        "spring.kafka.bootstrapServers:${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
@EmbeddedKafka(topics = SpringBootEmbeddedKafkaApplication.MY_TOPIC)
public class SpringBootEmbeddedKafkaApplicationTests {

    @Autowired
    private KafkaTemplate<Object, String> kafkaTemplate;

    @Autowired
    private SpringBootEmbeddedKafkaApplication kafkaApplication;

    @Test
    public void testListenerWithEmbeddedKafka() throws InterruptedException {
        String testMessage = "foo";
        this.kafkaTemplate.send(SpringBootEmbeddedKafkaApplication.MY_TOPIC, testMessage);

        assertThat(this.kafkaApplication.kafkaMessages.poll(10, TimeUnit.SECONDS)).isEqualTo(testMessage);
    }

}

注意spring.kafka.consumer.auto-offset-reset=earliest让消费者从分区的开头读取。

在测试中应用的另一个重要选项是:

@SpringBootTest(properties =
        "spring.kafka.bootstrapServers:${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")

@EmbeddedKafka填充spring.embedded.kafka.brokers系统属性并使 Spring Boot 自动配置知道我们需要将其值复制到配置spring.kafka.bootstrapServers属性。

或者根据我们的文档的另一种选择:

static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
于 2019-01-31T16:45:54.677 回答