0

为了测试我们的 Spring Boot 应用程序在 kafka 集群尚未启动时如何处理它,我想在应用程序启动后的某个时间在 junit 测试中启动一个嵌入式 kafka 集群。我怎么能接近这个?据我了解,spring-kafka-test@EmbeddedKafka在创建 SpringBootTest 的应用程序上下文之前启动集群。有没有办法配置那个时间?

4

1 回答 1

0

当定义为 bean(通过@EmbeddedKafka)或 JUnit 条件(再次通过@EmbeddedKafka- 当没有测试 Spring ApplicationContext 时)时,代理在afterPropertiesSet().

您应该能够手动创建代理并afterPropertiesSet()在准备好时调用。

这是来自 JUnit5 的代码EmbeddedkafkaCondition

@SuppressWarnings("unchecked")
private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
    EmbeddedKafkaBroker broker;
    int[] ports = setupPorts(embedded);
    broker = new EmbeddedKafkaBroker(embedded.count(), embedded.controlledShutdown(),
                    embedded.partitions(), embedded.topics())
            .zkPort(embedded.zookeeperPort())
            .kafkaPorts(ports)
            .zkConnectionTimeout(embedded.zkConnectionTimeout())
            .zkSessionTimeout(embedded.zkSessionTimeout());
    Properties properties = new Properties();

    for (String pair : embedded.brokerProperties()) {
        if (!StringUtils.hasText(pair)) {
            continue;
        }
        try {
            properties.load(new StringReader(pair));
        }
        catch (Exception ex) {
            throw new IllegalStateException("Failed to load broker property from [" + pair + "]",
                    ex);
        }
    }
    if (StringUtils.hasText(embedded.brokerPropertiesLocation())) {
        Resource propertiesResource = new PathMatchingResourcePatternResolver()
                .getResource(embedded.brokerPropertiesLocation());
        if (!propertiesResource.exists()) {
            throw new IllegalStateException(
                    "Failed to load broker properties from [" + propertiesResource
                            + "]: resource does not exist.");
        }
        try (InputStream in = propertiesResource.getInputStream()) {
            Properties p = new Properties();
            p.load(in);
            p.forEach(properties::putIfAbsent);
        }
        catch (IOException ex) {
            throw new IllegalStateException(
                    "Failed to load broker properties from [" + propertiesResource + "]", ex);
        }
    }
    broker.brokerProperties((Map<String, String>) (Map<?, ?>) properties);
    if (StringUtils.hasText(embedded.bootstrapServersProperty())) {
        broker.brokerListProperty(embedded.bootstrapServersProperty());
    }
    broker.afterPropertiesSet();
    return broker;
}
于 2021-09-14T13:05:39.143 回答