我有一个 ELKK docker compose stack,我正在尝试将一个 kafka 客户端(kafka js)连接到 kafka,但没有运气。
码头工人-compose.yml
version: "3.7"
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.1
container_name: zookeeper
restart: unless-stopped
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
TZ: "${TZ-Europe/Berlin}"
healthcheck:
test: "echo stat | nc localhost $$ZOOKEEPER_CLIENT_PORT"
start_period: 1m
kafka:
image: confluentinc/cp-kafka:5.4.1
container_name: kafka
restart: unless-stopped
depends_on:
- filebeat
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_DELETE_TOPIC_ENABLE: "true"
TZ: "${TZ-Europe/Berlin}"
healthcheck:
test: [ "CMD", "nc", "-z", "localhost", "9092" ]
start_period: 1m
kafka-rest-proxy:
image: confluentinc/cp-kafka-rest:5.4.1
container_name: kafka-rest-proxy
restart: unless-stopped
depends_on:
- zookeeper
- kafka
ports:
- "8082:8082"
environment:
KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
KAFKA_REST_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_REST_HOST_NAME: kafka-rest-proxy
KAFKA_REST_LISTENERS: http://0.0.0.0:8082
KAFKA_REST_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KAFKA_REST_CONSUMER_REQUEST_TIMEOUT_MS: 30000
TZ: "${TZ-Europe/Berlin}"
healthcheck:
test: "curl -f http://localhost:8082 || exit 1"
start_period: 1m
kafka-topics-ui:
image: landoop/kafka-topics-ui:0.9.4
container_name: kafka-topics-ui
restart: unless-stopped
depends_on:
- kafka-rest-proxy
ports:
- "8085:8000"
environment:
KAFKA_REST_PROXY_URL: http://kafka-rest-proxy:8082
PROXY: "true"
healthcheck:
test: "wget --quiet --tries=1 --spider http://localhost:8000 || exit 1"
start_period: 1m
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.4.2
container_name: elasticsearch
restart: unless-stopped
ports:
- "9200:9200"
- "9300:9300"
environment:
cluster.name: docker-es-cluster
discovery.type: single-node
bootstrap.memory_lock: "true"
ES_JAVA_OPTS: "-Xms512m -Xmx512m"
cap_add:
- ALL
privileged: true
volumes:
- esdata:/usr/share/elasticsearch/data
ulimits:
memlock:
soft: -1
hard: -1
healthcheck:
test: "curl -f http://localhost:9200 || exit 1"
start_period: 1m
depends_on:
- logstash
zipkin:
container_name: zipkin
image: openzipkin/zipkin:2.20.2
restart: unless-stopped
ports:
- "9411:9411"
healthcheck:
test: [ "CMD", "nc", "-z", "localhost", "9411" ]
start_period: 1m
kafka-manager:
container_name: kafka-manager
image: hlebalbau/kafka-manager:3.0.0.4
restart: unless-stopped
depends_on:
- zookeeper
ports:
- "9000:9000"
environment:
ZK_HOSTS: zookeeper:2181
APPLICATION_SECRET: "random-secret"
command: -Dpidfile.path=/dev/null
healthcheck:
test: "curl -f http://localhost:9000 || exit 1"
start_period: 1m
logstash:
image: docker.elastic.co/logstash/logstash:5.3.0
container_name: logstash
volumes:
- ./logstash-config/config/logstash.yml:/usr/share/logstash/config/logstash.yml
- ./logstash-config/pipeline:/usr/share/logstash/pipeline
depends_on:
- kafka
kibana:
image: docker.elastic.co/kibana/kibana-oss:6.4.2
container_name: kibana
environment:
elasticsearch.url: http://localhost:9200
depends_on:
- elasticsearch
ports:
- "5601:5601"
ulimits:
nproc: 65535
memlock:
soft: -1
hard: -1
healthcheck:
test: "curl -f http://localhost:5601 || exit 1"
start_period: 1m
cap_add:
- ALL
filebeat:
hostname: filebeat
user: root
container_name: filebeat
build:
context: ./filebeat
volumes:
- filebeat_data:/usr/share/filebeat/data:rw
- /var/lib/docker/containers:/usr/share/filebeat/dockerlogs/data:ro
- /var/run/docker.sock:/var/run/docker.sock
visual-app:
container_name: visual-app
build:
context: ./
dockerfile: visual-dockerfile
volumes:
- './app:/app'
- '/app/node_modules'
ports:
- '8080:8080'
depends_on:
- kafka
volumes:
filebeat_data:
esdata:
消费者.js
const { Kafka, logLevel } = require('kafkajs')
const host = 'kafka:9092'
const kafka = new Kafka({
logLevel: logLevel.INFO,
brokers: [`${host}`],
clientId: 'example-consumer',
})
const topic = 'log_stream'
const consumer = kafka.consumer({ groupId: 'test-group' })
const run = async () => {
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
console.log(`- ${prefix} ${message.key}#${message.value}`)
},
})
}
run().catch(e => console.error(`[example/consumer] ${e.message}`, e))
我得到的错误如下。
{"level":"ERROR","timestamp":"2021-02-23T22:04:33.736Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo ENOTFOUND kafka","broker":"kafka:9092","clientId":"example-consumer","stack":"Error: getaddrinfo ENOTFOUND kafka\n at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:67:26)"}
{"level":"ERROR","timestamp":"2021-02-23T22:04:33.740Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: getaddrinfo ENOTFOUND kafka","retryCount":0,"retryTime":314}
[example/consumer] Connection error: getaddrinfo ENOTFOUND kafka KafkaJSNonRetriableError
Caused by: KafkaJSConnectionError: Connection error: getaddrinfo ENOTFOUND kafka
at Socket.onError (/app/node_modules/kafkajs/src/network/connection.js:152:23)
at Socket.emit (events.js:315:20)
at emitErrorNT (internal/streams/destroy.js:106:8)
at emitErrorCloseNT (internal/streams/destroy.js:74:3)
at processTicksAndRejections (internal/process/task_queues.js:80:21) {
name: 'KafkaJSNumberOfRetriesExceeded',
retriable: false,
helpUrl: undefined,
originalError: KafkaJSConnectionError: Connection error: getaddrinfo ENOTFOUND kafka
at Socket.onError (/app/node_modules/kafkajs/src/network/connection.js:152:23)
at Socket.emit (events.js:315:20)
at emitErrorNT (internal/streams/destroy.js:106:8)
at emitErrorCloseNT (internal/streams/destroy.js:74:3)
at processTicksAndRejections (internal/process/task_queues.js:80:21) {
retriable: true,
helpUrl: undefined,
broker: 'kafka:9092',
code: 'ENOTFOUND'
},
retryCount: 5,
retryTime: 8114
}
例如,我也尝试过网络
networks:
kafkanet:
driver: bridge
已分配给所有服务,但没有任何改变。
我也尝试过hostname: kafka to kafka service,但也没有任何改变。
但是,我可以使用localhost:29092从外部客户端成功连接到代理。
那么,我做错了什么?