0

我在测试代码中运行了如下测试容器。

@Testcontainers
public class TestEnvironmentSupport {
    static String version = "5.4.0";
    static DockerImageName kafkaImage = DockerImageName.parse("confluentinc/cp-server").withTag(version);
    static DockerImageName zookeeperImage = DockerImageName.parse("confluentinc/cp-zookeeper").withTag(version);
    static DockerImageName schemaRegistryImage = DockerImageName.parse("confluentinc/cp-schema-registry").withTag(version);

    static Network network = Network.newNetwork();

    @Container
    static GenericContainer zookeeper = new GenericContainer<>(zookeeperImage)
        .withNetwork(network)
        .withCreateContainerCmdModifier(cmd -> cmd.withHostName("zookeeper"))
        .withExposedPorts(2181)
        .withEnv("ZOOKEEPER_CLIENT_PORT", "2181")
        .withEnv("ZOOKEEPER_TICK_TIME", "2000");

    @Container
    static GenericContainer kafka = new GenericContainer<>(kafkaImage)
        .withNetwork(network)
        .withCreateContainerCmdModifier(cmd -> cmd.withHostName("kafka"))
        .withExposedPorts(9092)
        .dependsOn(zookeeper)
        .withEnv("KAFKA_BROKER_ID", "1")
        .withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181")
        .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT")
        .withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092")
        .withEnv("KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL", "schema-registry:8081");

    @Container
    static GenericContainer schemaRegistry = new GenericContainer<>(schemaRegistryImage)
        .withNetwork(network)
        .withCreateContainerCmdModifier(cmd -> cmd.withHostName("schema-registry"))
        .withExposedPorts(8081)
        .dependsOn(zookeeper, kafka)
        .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
        .withEnv("SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL", "zookeeper:2181");

    @Test
    void test() {
        assertTrue(zookeeper.isRunning());
        assertTrue(kafka.isRunning());
        assertTrue(schemaRegistry.isRunning());
    }
}

它工作得很好。但是当我尝试使用上面的 testcontainer 配置运行 spring boot 测试时出现了问题,因为 testcontainer 动态生成代理端口,但 NetworkClient 不断使用 localhost:9092 访问代理,即使我动态覆盖@SpringBootTest 代码上的属性,如下所示

@DynamicPropertySource
static void testcontainerProperties(final DynamicPropertyRegistry registry) {
    var bootstrapServers = kafka.getHost() + ":" + kafka.getMappedPort(9092);
    var schemaRegistryUrl = "http://" + schemaRegistry.getHost() + ":" + schemaRegistry.getMappedPort(8081);
    registry.add("spring.cloud.stream.kafka.binder.brokers", () -> bootstrapServers);
    registry.add("bootstrap.servers", () -> bootstrapServers);
    registry.add("schema.registry.url", () -> schemaRegistryUrl);
    registry.add("spring.cloud.stream.kafka.default.consumer.configuration.schema.registry.url", () -> schemaRegistryUrl);
}

下面是 AdminClientConfig 日志启动时间,它显示bootstrap.servers = [localhost:56001]了 testcontainer 动态绑定的端口。

2021-02-21 20:28:52.291  INFO 78241 --- [    Test worker] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values:
        bootstrap.servers = [localhost:56013]
        client.dns.lookup = use_all_dns_ips

即使我这样设置,它仍然会尝试连接到 localhost:9092,如下所示。

2021-02-21 20:28:52.457  INFO 78241 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1613906932454
2021-02-21 20:28:53.095  WARN 78241 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2021-02-21 20:28:53.202  WARN 78241 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2021-02-21 20:28:53.407  WARN 78241 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

下面是docker ps运行 spring boot 测试的结果。

CONTAINER ID   IMAGE                                   COMMAND                  CREATED              STATUS              PORTS                                                                       NAMES
e340d9e15fe4   confluentinc/cp-schema-registry:5.4.0   "/etc/confluent/dock…&quot;   46 seconds ago       Up 46 seconds       0.0.0.0:56014->8081/tcp                                                     optimistic_joliot
ad3bf06df4b3   confluentinc/cp-server:5.4.0            "/etc/confluent/dock…&quot;   55 seconds ago       Up 54 seconds       0.0.0.0:56013->9092/tcp                                                     infallible_brown
f7fa5f4ae23c   confluentinc/cp-zookeeper:5.4.0         "/etc/confluent/dock…&quot;   About a minute ago   Up 59 seconds       0.0.0.0:56012->2181/tcp, 0.0.0.0:56011->2888/tcp, 0.0.0.0:56010->3888/tcp   agitated_leavitt
b1c036cdf00b   testcontainers/ryuk:0.3.0               "/app"                   About a minute ago   Up About a minute   0.0.0.0:56009->8080/tcp                                                     testcontainers-ryuk-68190eaa-8513-4dd8-ab67-175275f15a82

我尝试使用 docker compose 模块运行 testcontainers,但它有同样的问题。我究竟做错了什么?请帮忙。

4

1 回答 1

1

它正在尝试连接,localhost:9092因为您已尝试连接到广告PLAINTEXT_HOST端口,这就是它将返回的地址。您不需要为测试宣传两个侦听器,因此请尝试kafka:29092直接使用而不是调用映射端口方法。此外,除非您特别需要服务器端模式验证,否则您只需要confluentinc/cp-kafkaimage.

spring-kafka 嵌入式代理也应该可以进行测试,因此您不需要测试容器

于 2021-02-21T18:37:43.140 回答