我正在使用带有 spring boot 和 junit 5 的 Kafka 嵌入式代理。我已经能够成功连接并看到嵌入式代理正在运行。
在我的设置方法中,我将一些消息注入到我的实际代码监听的队列中
@BeforeAll
public void setup() {
// code to play down some messages to topic X
}
尽管设置方法中没有遇到错误,但我的消费者/听众永远不会被触发
我的消费者设置如下
class Consumer() {
@KafkaListener(topics="X",
groupId ="...",
containerFactory="my-container-factory"
)
public void consume(ConsumerRecord<String,byte[] rec) {
//logic to handle
logger.info("Print rec : "+rec)
}
}
在其他地方我设置了我的 ListenerContainerFactory ,名称如下
@Bean(name="my-container-factory")
public KafkaContainerListenerFactory<String,byte[]> factory() {
}
这可能有什么问题?我在测试用例中的断言失败,此外,如果我的消费方法被调用,我看不到应该打印的日志语句。
我有一种感觉,自动配置是由于@SpringBootTest
并且@EmbeddedKafka
正在设置其他一些侦听器容器工厂,所以我的@KafkaListener
注释可能是错误的。我知道,它有点模糊,但你能告诉我看什么/在哪里看吗?如果我作为消费者运行,@SpringBootApplication
我的消费者会从实际队列中拉入消息。所以我的实际应用程序没有问题。它的测试没有执行按照预期。
请帮忙。
编辑 1:我已经spring.kafka.consumer.auto-offset-reset=earliest
在我的 yml 文件中设置了。