目标:我想分析一个主题的标题,我正在寻找一些直接的方法来查看标题。所以我不想为此开发额外的应用程序或扩展代码。任何用于查看标题的简单工具都会很有用。
从this question我读到kafkacat可以这样
kafkacat -b kafka-broker:9092 -t my_topic_name -C \
-f '\nKey (%K bytes): %k
Value (%S bytes): %s
Timestamp: %T
Partition: %p
Offset: %o
Headers: %h\n'
所以我希望启动 kafkacat 作为我的 docker-compose 的一部分,我的第一次尝试是
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:5.4.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
kafka-tools:
image: confluentinc/cp-kafka:5.4.0
hostname: kafka
container_name: kafka
command: ["tail", "-f", "/dev/null"]
network_mode: "host"
kafkacat:
image: confluentinc/cp-kafkacat
command:
- bash
- -c
links:
- broker
我得到了
broker | [2020-12-11 10:46:13,084] DEBUG [Controller id=1] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController)
broker | [2020-12-11 10:46:13,084] TRACE [Controller id=1] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
confluentinc_kafkacat_1 exited with code 2
broker | [2020-12-11 10:51:13,085] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
基于kafkacat教程的第二个试探基础
C:\Users\DEMETRC>docker run --tty confluentinc/cp-kafkacat kafkacat -b localhost:9092 -L
% ERROR: Failed to acquire metadata: Local: Broker transport failure
PS:我的 docker kafka 在没有 docker 网络的 localhost:9092 中可用。在教程的示例中,它的 kafka 在 Docker 网络 docker-compose_default 上的 kafka:29092 中可用
任何线索如何将 kafkacat 添加到我的 docker-compose 中?调查邮件标题的任何其他建议?
*** Robin Moffatt 回答后的第一次编辑
我编辑了 docker-compose 添加了 Robin 的建议
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:5.4.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
kafka-tools:
image: confluentinc/cp-kafka:5.4.0
hostname: kafka
container_name: kafka
command: ["tail", "-f", "/dev/null"]
network_mode: "host"
kafkacat:
image: edenhill/kafkacat:1.6.0
container_name: kafkacat
links:
- broker
entrypoint:
- /bin/sh
- -c
- |
apk add jq;
while [ 1 -eq 1 ];do sleep 60;done
这是我的主题描述
# kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demotopic
Created topic demotopic.
# kafka-topics --describe --bootstrap-server localhost:9092 --topic demotopic
Topic: demotopic PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: demotopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
#
以及我尝试使用 kafkacat 的错误
/ # kafkacat -b localhost:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p
Offset: %o Headers: %h\n'
%3|1607709622.155|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1607709623.155|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
% ERROR: Failed to query metadata for topic demotopic: Local: Broker transport failure
/ #
*** 第二次编辑
现在我可以使用 kafkacat 列出所有主题,但我仍然无法获取消息头
/ # kafkacat -L -b broker:9092
Metadata for all topics (from broker -1: broker:9092/bootstrap):
1 brokers:
broker 1 at localhost:9092 (controller)
4 topics:
topic "demotopic" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "_confluent-license" with 1 partitions:
/ # kafkacat -L -b broker:9092
Metadata for all topics (from broker -1: broker:9092/bootstrap):
1 brokers:
broker 1 at localhost:9092 (controller)
4 topics:
topic "demotopic" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "_confluent-license" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "_confluent-metrics" with 12 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
partition 1, leader 1, replicas: 1, isrs: 1
partition 2, leader 1, replicas: 1, isrs: 1
partition 3, leader 1, replicas: 1, isrs: 1
partition 4, leader 1, replicas: 1, isrs: 1
partition 5, leader 1, replicas: 1, isrs: 1
partition 6, leader 1, replicas: 1, isrs: 1
partition 7, leader 1, replicas: 1, isrs: 1
partition 8, leader 1, replicas: 1, isrs: 1
partition 9, leader 1, replicas: 1, isrs: 1
partition 10, leader 1, replicas: 1, isrs: 1
partition 11, leader 1, replicas: 1, isrs: 1
topic "__confluent.support.metrics" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
/ # kafkacat -b broker:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Of
fset: %o Headers: %h\n'
%3|1607719862.133|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/1]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
% ERROR: Local: Broker transport failure: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
这个 kafkacat 脚本有什么问题吗?
kafkacat -b broker:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n'