2

我使用 EmbededKafka 实现了一堆集成测试,以测试我们的一个使用 spring-kafka 框架运行的 Kafka 流应用程序。

流应用程序正在从 Kafka 主题中读取消息,将其存储到内部状态存储中,进行一些转换并将其发送到另一个微服务到请求的主题中。当响应返回响应主题时,它会从状态存储中检索原始消息,并根据某些业务逻辑将其转发到我们的下游系统之一,每个系统都有自己的主题。

集成测试只是执行业务条件的各种排列。

最初,测试被分成不同的班级。在运行构建时,一个类中的测试与另一类中的测试发生冲突,并出现一些冲突异常。我没有花太多时间在这上面,只是把所有的测试都移到了同一个班级里。这解决了我从 gradle build 或 intelij EDI 通过的所有测试的问题。

这是测试:

package au.nab.tlm.streams.integration;

import au.nab.tlm.streams.serde.EntitlementsCheckSerDes;
import au.nab.tlm.streams.test.support.MockEntitlementsCheckSerDes;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;

@SpringBootTest
@ContextConfiguration(classes = {MyTopologiesIntegrationTest.TestKafkaConfig.class})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(
        ports = 9092,
        partitions = 1,
        topics = {
                "topic-1.v1",
                "topic-2.v1",
                "topic-3.v1",
                "topic-4.v1",
                "topic-5.v1",
                "topic-6.v1",
        },
        brokerProperties = {"transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1", "log.dir=/tmp/embedded-kafka"}
)
public class MyTopologiesIntegrationTest {
    @Autowired
    EmbeddedKafkaBroker kafkaBroker;

    @Autowired
    EntitlementsCheckSerDes appSerDes;

    @Test
    public void test_1() {
    }

    @Test
    public void test_2() {
    }

    @Test
    public void test_3() {
    }

    @Test
    public void test_4() {
    }

    @Test
    public void test_5() {
    }

    @TestConfiguration
    public static class TestKafkaConfig {
        @Bean
        EntitlementsCheckSerDes appSerDes() {
            return new MockEntitlementsCheckSerDes();
        }
    }
}

对结果感到满意,我推送了我的更改,只是为了注意到构建在我们的 CI 服务器上失败。再次运行它,这次又失败了,失败与第一次不同。我让一位同事看了一下,他也有与 CI 服务器类似的故障经历。我在我的机器上运行了至少 20 次构建,它总是通过。对我的同事进行一项一项的测试也总是通过。

我们得到的最常见的异常是主题 xyz 已经存在,但偶尔会有一些其他异常表明集群不可能是喜欢的或类似的。所有这些异常都向我们表明,在前一个测试中使用的嵌入式 Kafka 在下一个测试开始之前没有完全关闭,尽管使用了DirtiesContext注释。第一个运行的测试总是通过。

我们俩都花了一整天的时间脱掉我们的头发,这是没有办法让它工作的。我们尝试了谷歌带我们去的任何地方,完全没有运气。最后,我们在测试类中留下了唯一的一个测试场景(交互次数最多的一个)并禁用了其余的。

显然,这不是一个可以接受的永久解决方案,我真的很想了解我们做错了什么以及如何解决它。

预先感谢您的意见。

4

1 回答 1

3

不要使用固定端口ports = 9092,——默认情况下,嵌入式 kafka 将侦听操作系统选择的随机端口。

您应该将它用于您的测试用例。

您可以通过调用获取代理地址this.kafkaBroker.getBrokersAsString()

于 2020-06-16T14:31:45.933 回答