0

我正在尝试在使用 @KafkaListener 创建的 springboot 中测试侦听器但侦听器总是在 localhost:9092 上侦听,而不是使用这个 embededKafka

我的听众看起来像这样:


@Component
@Slf4j
class SomeListener {
    private final List<String> receivedMessages = new ArrayList<>();

    @KafkaListener(topics = "some-ultra-cool-topic")
    public void onKafkaMessage(String theMessage) {
        log.info("Message received {}", theMessage);
        receivedMessages.add(theMessage);
    }

    Collection<String> getAll() {
        return unmodifiableCollection(receivedMessages);
    }
}

像这样的 spock 测试:

@SpringBootTest
@EmbeddedKafka
@TestPropertySource(properties = ['spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}', 'spring.kafka.consumer.auto-offset-reset=earliest'])
class SomeListenerTest extends Specification {
    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker

    @Autowired
    SomeListener someListener

    void 'should receive message'() {
        given:
            def sender = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, String>(KafkaTestUtils.producerProps(embeddedKafkaBroker)))

        when:
            sender.send('some-ultra-cool-topic', 'first message content')
        then:
            someListener.all.size() == 1
    }

}

我的 application.yaml 没有配置引导服务器 - 所以它是 spring-boot 的完全默认设置。

我可以在日志中看到生产者正在向代理发送消息(它每次都在不同的随机端口上启动)。但是侦听器总是尝试连接到 localhost:9092 上的代理

如何配置它以使用这个嵌入式?

4

1 回答 1

0

感谢@sawim 的提示

实际问题在测试中。我最终用 lib org.awaitility:awaitility 做了这个测试

        then:
        waitAtMost(5, SECONDS)
                .untilAsserted({ ->
                    assertThat(personFacade.findAll(), hasSize(1))
                })

第一个示例中的配置有效,但是在启动期间我可以看到 kafka-logs 试图连接到 localhost:9200 - 似乎我们可以忽略它

于 2020-08-25T09:14:03.823 回答