0

我正在使用带有 spring boot 和 junit 5 的 Kafka 嵌入式代理。我已经能够成功连接并看到嵌入式代理正在运行。

在我的设置方法中,我将一些消息注入到我的实际代码监听的队列中

    @BeforeAll
    public void setup() {
             // code to play down some messages to topic X
    }

尽管设置方法中没有遇到错误,但我的消费者/听众永远不会被触发

我的消费者设置如下

class Consumer() {

@KafkaListener(topics="X",
groupId ="...",
containerFactory="my-container-factory"
)
public void consume(ConsumerRecord<String,byte[] rec) {
 //logic to handle
   logger.info("Print rec : "+rec)
}

}

在其他地方我设置了我的 ListenerContainerFactory ,名称如下

@Bean(name="my-container-factory")
public KafkaContainerListenerFactory<String,byte[]> factory() {
}

这可能有什么问题?我在测试用例中的断言失败,此外,如果我的消费方法被调用,我看不到应该打印的日志语句。

我有一种感觉,自动配置是由于@SpringBootTest并且@EmbeddedKafka正在设置其他一些侦听器容器工厂,所以我的@KafkaListener注释可能是错误的。我知道,它有点模糊,但你能告诉我看什么/在哪里看吗?如果我作为消费者运行,@SpringBootApplication我的消费者会从实际队列中拉入消息。所以我的实际应用程序没有问题。它的测试没有执行按照预期。

请帮忙。

编辑 1:我已经spring.kafka.consumer.auto-offset-reset=earliest在我的 yml 文件中设置了。

4

0 回答 0