0

目标:我想分析一个主题的标题,我正在寻找一些直接的方法来查看标题。所以我不想为此开发额外的应用程序或扩展代码。任何用于查看标题的简单工具都会很有用。

this question我读到kafkacat可以这样

kafkacat -b kafka-broker:9092 -t my_topic_name -C \
  -f '\nKey (%K bytes): %k
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o
  Headers: %h\n'

所以我希望启动 kafkacat 作为我的 docker-compose 的一部分,我的第一次尝试是

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.4.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:5.4.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  kafka-tools:
    image: confluentinc/cp-kafka:5.4.0
    hostname: kafka
    container_name: kafka
    command: ["tail", "-f", "/dev/null"]
    network_mode: "host"

  kafkacat:
    image: confluentinc/cp-kafkacat
    command: 
      - bash 
      - -c 
    links:
      - broker

我得到了

broker         | [2020-12-11 10:46:13,084] DEBUG [Controller id=1] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController)
broker         | [2020-12-11 10:46:13,084] TRACE [Controller id=1] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
confluentinc_kafkacat_1 exited with code 2
broker         | [2020-12-11 10:51:13,085] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)

基于kafkacat教程的第二个试探基础

C:\Users\DEMETRC>docker run --tty confluentinc/cp-kafkacat kafkacat -b localhost:9092 -L
% ERROR: Failed to acquire metadata: Local: Broker transport failure

PS:我的 docker kafka 在没有 docker 网络的 localhost:9092 中可用。在教程的示例中,它的 kafka 在 Docker 网络 docker-compose_default 上的 kafka:29092 中可用

任何线索如何将 kafkacat 添加到我的 docker-compose 中?调查邮件标题的任何其他建议?

*** Robin Moffatt 回答后的第一次编辑

我编辑了 docker-compose 添加了 Robin 的建议

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.4.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  broker:
    image: confluentinc/cp-server:5.4.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  kafka-tools:
    image: confluentinc/cp-kafka:5.4.0
    hostname: kafka
    container_name: kafka
    command: ["tail", "-f", "/dev/null"]
    network_mode: "host"

  kafkacat:
    image: edenhill/kafkacat:1.6.0
    container_name: kafkacat
    links:
      - broker
    entrypoint: 
      - /bin/sh 
      - -c 
      - |
        apk add jq; 
        while [ 1 -eq 1 ];do sleep 60;done

这是我的主题描述

# kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demotopic
Created topic demotopic.
# kafka-topics --describe --bootstrap-server localhost:9092 --topic demotopic
Topic: demotopic        PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: demotopic        Partition: 0    Leader: 1       Replicas: 1     Isr: 1
#

以及我尝试使用 kafkacat 的错误

/ # kafkacat -b localhost:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p
 Offset: %o Headers: %h\n'
%3|1607709622.155|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1607709623.155|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
% ERROR: Failed to query metadata for topic demotopic: Local: Broker transport failure
/ #  

*** 第二次编辑

现在我可以使用 kafkacat 列出所有主题,但我仍然无法获取消息头

/ #  kafkacat -L -b broker:9092
Metadata for all topics (from broker -1: broker:9092/bootstrap):
 1 brokers:
  broker 1 at localhost:9092 (controller)
 4 topics:
  topic "demotopic" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "_confluent-license" with 1 partitions:
/ # kafkacat -L -b broker:9092
Metadata for all topics (from broker -1: broker:9092/bootstrap):
 1 brokers:
  broker 1 at localhost:9092 (controller)
 4 topics:
  topic "demotopic" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "_confluent-license" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "_confluent-metrics" with 12 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
    partition 1, leader 1, replicas: 1, isrs: 1
    partition 2, leader 1, replicas: 1, isrs: 1
    partition 3, leader 1, replicas: 1, isrs: 1
    partition 4, leader 1, replicas: 1, isrs: 1
    partition 5, leader 1, replicas: 1, isrs: 1
    partition 6, leader 1, replicas: 1, isrs: 1
    partition 7, leader 1, replicas: 1, isrs: 1
    partition 8, leader 1, replicas: 1, isrs: 1
    partition 9, leader 1, replicas: 1, isrs: 1
    partition 10, leader 1, replicas: 1, isrs: 1
    partition 11, leader 1, replicas: 1, isrs: 1
  topic "__confluent.support.metrics" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
/ # kafkacat -b broker:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Of
fset: %o Headers: %h\n'
%3|1607719862.133|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/1]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
% ERROR: Local: Broker transport failure: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)

这个 kafkacat 脚本有什么问题吗?

kafkacat -b broker:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n'
4

1 回答 1

1

confluentinc_kafkacat_1 exited with code 2 is because the container exited, since you overrode the startup command to just launch bash; which it does, and then exits

   command: 
      - bash 
      - -c 

This file shows a working example of what you need, in which the container will stay running

 entrypoint: 
      - /bin/sh 
      - -c 
      - |
        apk add jq; 
        while [ 1 -eq 1 ];do sleep 60;done

To Aydin's point that you don't need kafkacat in Docker - you can do it locally, but I find it easier to include in the Docker Compose so that you're not dependent on a local install.

If you want to run kafkacat outside of Docker Compose but still in Docker with docker run you can, but bear in mind the networking implications. If you try to use localhost then that's relative to the container itself, i.e. where kafkacat is running. Instead you need to make the Kafka broker accessible to the kafkacat container, e.g. by adding to the host network:

docker run --network host --interactive --rm edenhill/kafkacat:1.6.0 -b localhost:9092 -L
于 2020-12-11T13:46:47.720 回答