1

我有一个 ELKK docker compose stack,我正在尝试将一个 kafka 客户端(kafka js)连接到 kafka,但没有运气。

码头工人-compose.yml

version: "3.7"
services:
zookeeper:
    image: confluentinc/cp-zookeeper:5.4.1
    container_name: zookeeper
    restart: unless-stopped
    ports:
        - "2181:2181"
    environment:
        ZOOKEEPER_CLIENT_PORT: 2181
        TZ: "${TZ-Europe/Berlin}"
    healthcheck:
        test: "echo stat | nc localhost $$ZOOKEEPER_CLIENT_PORT"
        start_period: 1m

kafka:
    image: confluentinc/cp-kafka:5.4.1
    container_name: kafka
    restart: unless-stopped
    depends_on:
        - filebeat
        - zookeeper
    ports:
        - "29092:29092"
    environment:
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        KAFKA_DELETE_TOPIC_ENABLE: "true"
        TZ: "${TZ-Europe/Berlin}"
    healthcheck:
        test: [ "CMD", "nc", "-z", "localhost", "9092" ]
        start_period: 1m

kafka-rest-proxy:
    image: confluentinc/cp-kafka-rest:5.4.1
    container_name: kafka-rest-proxy
    restart: unless-stopped
    depends_on:
        - zookeeper
        - kafka
    ports:
        - "8082:8082"
    environment:
        KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
        KAFKA_REST_ZOOKEEPER_CONNECT: zookeeper:2181
        KAFKA_REST_HOST_NAME: kafka-rest-proxy
        KAFKA_REST_LISTENERS: http://0.0.0.0:8082
        KAFKA_REST_SCHEMA_REGISTRY_URL: http://schema-registry:8081
        KAFKA_REST_CONSUMER_REQUEST_TIMEOUT_MS: 30000
        TZ: "${TZ-Europe/Berlin}"
    healthcheck:
        test: "curl -f http://localhost:8082 || exit 1"
        start_period: 1m

kafka-topics-ui:
    image: landoop/kafka-topics-ui:0.9.4
    container_name: kafka-topics-ui
    restart: unless-stopped
    depends_on:
        - kafka-rest-proxy
    ports:
        - "8085:8000"
    environment:
        KAFKA_REST_PROXY_URL: http://kafka-rest-proxy:8082
        PROXY: "true"
    healthcheck:
        test: "wget --quiet --tries=1 --spider http://localhost:8000 || exit 1"
        start_period: 1m

elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.4.2
    container_name: elasticsearch
    restart: unless-stopped
    ports:
        - "9200:9200"
        - "9300:9300"
    environment:
        cluster.name: docker-es-cluster
        discovery.type: single-node
        bootstrap.memory_lock: "true"
        ES_JAVA_OPTS: "-Xms512m -Xmx512m"
    cap_add:
        - ALL
    privileged: true
    volumes:
        - esdata:/usr/share/elasticsearch/data
    ulimits:
        memlock:
            soft: -1
            hard: -1
    healthcheck:
        test: "curl -f http://localhost:9200 || exit 1"
        start_period: 1m
    depends_on:
        - logstash

zipkin:
    container_name: zipkin
    image: openzipkin/zipkin:2.20.2
    restart: unless-stopped
    ports:
        - "9411:9411"
    healthcheck:
        test: [ "CMD", "nc", "-z", "localhost", "9411" ]
        start_period: 1m

kafka-manager:
    container_name: kafka-manager
    image: hlebalbau/kafka-manager:3.0.0.4
    restart: unless-stopped
    depends_on:
        - zookeeper
    ports:
        - "9000:9000"
    environment:
        ZK_HOSTS: zookeeper:2181
        APPLICATION_SECRET: "random-secret"
    command: -Dpidfile.path=/dev/null
    healthcheck:
        test: "curl -f http://localhost:9000 || exit 1"
        start_period: 1m

logstash:
    image: docker.elastic.co/logstash/logstash:5.3.0
    container_name: logstash
    volumes:
        - ./logstash-config/config/logstash.yml:/usr/share/logstash/config/logstash.yml
        - ./logstash-config/pipeline:/usr/share/logstash/pipeline
    depends_on:
        - kafka

kibana:
    image: docker.elastic.co/kibana/kibana-oss:6.4.2
    container_name: kibana
    environment:
        elasticsearch.url: http://localhost:9200
    depends_on:
        - elasticsearch
    ports:
        - "5601:5601"
    ulimits:
        nproc: 65535
    memlock:
        soft: -1
        hard: -1
    healthcheck:
        test: "curl -f http://localhost:5601 || exit 1"
        start_period: 1m
    cap_add:
        - ALL

filebeat:
    hostname: filebeat
    user: root
    container_name: filebeat
    build:
        context: ./filebeat
    volumes:
        - filebeat_data:/usr/share/filebeat/data:rw
        - /var/lib/docker/containers:/usr/share/filebeat/dockerlogs/data:ro
        - /var/run/docker.sock:/var/run/docker.sock

visual-app:
    container_name: visual-app
    build:
        context: ./
        dockerfile: visual-dockerfile
    volumes:
        - './app:/app'
        - '/app/node_modules'
    ports:
        - '8080:8080'
    depends_on:
        - kafka

volumes:
    filebeat_data:
    esdata:

消费者.js

const { Kafka, logLevel } = require('kafkajs')

const host = 'kafka:9092'

const kafka = new Kafka({
    logLevel: logLevel.INFO,
    brokers: [`${host}`],
    clientId: 'example-consumer',
})

const topic = 'log_stream'
const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {
    await consumer.connect()
    await consumer.subscribe({ topic, fromBeginning: true })
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
        const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
        console.log(`- ${prefix} ${message.key}#${message.value}`)
        },
    })
}

run().catch(e => console.error(`[example/consumer] ${e.message}`, e))

我得到的错误如下。

{"level":"ERROR","timestamp":"2021-02-23T22:04:33.736Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo ENOTFOUND kafka","broker":"kafka:9092","clientId":"example-consumer","stack":"Error: getaddrinfo ENOTFOUND kafka\n    at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:67:26)"}
{"level":"ERROR","timestamp":"2021-02-23T22:04:33.740Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: getaddrinfo ENOTFOUND kafka","retryCount":0,"retryTime":314}
[example/consumer] Connection error: getaddrinfo ENOTFOUND kafka KafkaJSNonRetriableError
Caused by: KafkaJSConnectionError: Connection error: getaddrinfo ENOTFOUND kafka
    at Socket.onError (/app/node_modules/kafkajs/src/network/connection.js:152:23)
    at Socket.emit (events.js:315:20)
    at emitErrorNT (internal/streams/destroy.js:106:8)
    at emitErrorCloseNT (internal/streams/destroy.js:74:3)
    at processTicksAndRejections (internal/process/task_queues.js:80:21) {
        name: 'KafkaJSNumberOfRetriesExceeded',
        retriable: false,
        helpUrl: undefined,
        originalError: KafkaJSConnectionError: Connection error: getaddrinfo ENOTFOUND kafka
            at Socket.onError (/app/node_modules/kafkajs/src/network/connection.js:152:23)
            at Socket.emit (events.js:315:20)
            at emitErrorNT (internal/streams/destroy.js:106:8)
            at emitErrorCloseNT (internal/streams/destroy.js:74:3)
            at processTicksAndRejections (internal/process/task_queues.js:80:21) {
                retriable: true,
                helpUrl: undefined,
                broker: 'kafka:9092',
                code: 'ENOTFOUND'
            },
        retryCount: 5,
        retryTime: 8114
}

例如,我也尝试过网络

networks:
    kafkanet:
        driver: bridge

已分配给所有服务,但没有任何改变。

我也尝试过hostname: kafka to kafka service,但也没有任何改变。

但是,我可以使用localhost:29092从外部客户端成功连接到代理。

那么,我做错了什么?

4

1 回答 1

0

您的代码中有两个 KAFKA_ADVERTISED_LISTENERS

尝试连接到端口 29092 它对我有用

于 2022-01-12T23:01:13.030 回答