1

我尝试按照此文档在 docker-desktop k8s 上运行 srimzi 。

我设置了所有的东西。

$ kubectl get all -n my-kafka-project                                           
NAME                                              READY   STATUS    RESTARTS   AGE
pod/my-cluster-entity-operator-7ddb6d5b88-q2xg8   3/3     Running   0          97m
pod/my-cluster-kafka-0                            1/1     Running   0          98m
pod/my-cluster-zookeeper-0                        1/1     Running   0          98m

NAME                                          TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
service/my-cluster-kafka-0                    NodePort    10.99.207.179    <none>        9094:31805/TCP               98m
service/my-cluster-kafka-bootstrap            ClusterIP   10.103.193.53    <none>        9091/TCP,9092/TCP,9093/TCP   98m
service/my-cluster-kafka-brokers              ClusterIP   None             <none>        9091/TCP,9092/TCP,9093/TCP   98m
service/my-cluster-kafka-external-bootstrap   NodePort    10.97.198.62     <none>        9094:31314/TCP               98m
service/my-cluster-zookeeper-client           ClusterIP   10.101.206.203   <none>        2181/TCP                     98m
service/my-cluster-zookeeper-nodes            ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   98m

NAME                                         READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/my-cluster-entity-operator   1/1     1            1           97m

NAME                                                    DESIRED   CURRENT   READY   AGE
replicaset.apps/my-cluster-entity-operator-7ddb6d5b88   1         1         1       97m

NAME                                    READY   AGE
statefulset.apps/my-cluster-kafka       1/1     98m
statefulset.apps/my-cluster-zookeeper   1/1     98m

并获取节点端口:

$ kubectl get service my-cluster-kafka-external-bootstrap -n my-kafka-project -o=jsonpath='{.spec.ports[0].nodePort}{"\n"}'
31314

虽然本文档使用 minikube,但我可以通过 访问 docker-desktop 节点localhost,因此我尝试访问localhost:31314,但无法生成主题消息。看来我可以成功连接到代理,但不能连接到主题。

./kafka-console-producer.sh --broker-list localhost:31314 --topic my-topic 
>Test
[2021-05-15 15:41:13,681] ERROR Error when sending message to topic my-topic with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Topic my-topic not present in metadata after 60000 ms.

我用kafkacat检查过,主题本身已成功创建。

kafkacat -b localhost:31314 -L
Metadata for all topics (from broker -1: localhost:31314/bootstrap):
 1 brokers:
  broker 0 at 192.168.65.4:31805 (controller)
 4 topics:
  topic "__strimzi_store_topic" with 1 partitions:
    partition 0, leader 0, replicas: 0, isrs: 0
  topic "my-topic" with 3 partitions:
    partition 0, leader 0, replicas: 0, isrs: 0
    partition 1, leader 0, replicas: 0, isrs: 0
    partition 2, leader 0, replicas: 0, isrs: 0
  topic "__strimzi-topic-operator-kstreams-topic-store-changelog" with 1 partitions:
    partition 0, leader 0, replicas: 0, isrs: 0
  topic "__consumer_offsets" with 50 partitions:
    partition 0, leader 0, replicas: 0, isrs: 0
    partition 1, leader 0, replicas: 0, isrs: 0
    partition 2, leader 0, replicas: 0, isrs: 0
    partition 3, leader 0, replicas: 0, isrs: 0
    partition 4, leader 0, replicas: 0, isrs: 0
    partition 5, leader 0, replicas: 0, isrs: 0
    ...

我不完全了解消息如何发送到主题的机制,那么接下来我应该检查哪一点?

4

1 回答 1

2

根据 Jakub 的评论,我可以通过更改命名空间中的Kafka资源来生成和使用来自主机的消息my-kafka-project

我已经覆盖了外部侦听器中的广告侦听器.spec.kafka.listeners

从:

- name: external
  port: 9094
  tls: false
  type: nodeport

至:

- name: external
  port: 9094
  tls: false
  type: nodeport
  configuration:
    brokers:
    - broker: 0
      advertisedHost: localhost
      advertisedPort: 31314
于 2021-05-15T16:17:09.620 回答