0

我正在尝试使用新的 KafkaConnector 资源遵循教程 Deploying Debezium。根据教程,我也在使用 minikube,但使用了 docker 驱动程序。基本上只是一步一步地完全按照。

但是,对于“创建连接器”步骤,在创建连接器之后

cat <<EOF | kubectl -n kafka apply -f -
apiVersion: "kafka.strimzi.io/v1alpha1"
kind: "KafkaConnector"
metadata:
  name: "inventory-connector"
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    database.hostname: 192.168.99.1
    database.port: "3306"
    database.user: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_username}"
    database.password: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_password}"
    database.server.id: "184054"
    database.server.name: "dbserver1"
    database.whitelist: "inventory"
    database.history.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
    database.history.kafka.topic: "schema-changes.inventory"
    include.schema.changes: "true" 
EOF

并检查

kubectl -n kafka get kctr inventory-connector -o yaml

我有错误

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"192.168.49.2","database.password":"","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
  creationTimestamp: "2021-09-29T18:20:11Z"
  generation: 1
  labels:
    strimzi.io/cluster: my-connect-cluster
  name: inventory-connector
  namespace: kafka
  resourceVersion: "12777"
  uid: 083df9a3-83ce-4170-a9bc-9573dafdb286
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  config:
    database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
    database.history.kafka.topic: schema-changes.inventory
    database.hostname: 192.168.49.2
    database.password: ""
    database.port: "3306"
    database.server.id: "184054"
    database.server.name: dbserver1
    database.user: ""
    database.whitelist: inventory
    include.schema.changes: "true"
  tasksMax: 1
status:
  conditions:
  - lastTransitionTime: "2021-09-29T18:20:11.548Z"
    message: |-
      PUT /connectors/inventory-connector/config returned 400 (Bad Request): Connector configuration is invalid and contains the following 1 error(s):
      A value is required
      You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
    reason: ConnectRestException
    status: "True"
    type: NotReady
  observedGeneration: 1

我试图改变

database.user: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_username}"
database.password: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_password}"

database.user: "debezium"
database.password: "dbz"

根据“保护数据库凭据”步骤中的用户和密码信息,直接重新申请。

另外,根据教程中的描述

我将database.hostname: 192.168.99.1其用作连接 MySQL 的 IP 地址,因为我将 minikube 与 virtualbox VM 驱动程序一起使用如果您使用不同的 VM 驱动程序,则minikube可能需要不同的 IP 地址。

我实际上对上面的描述有点困惑。演示中的 MySQL 部署在 Docker 中,而 Kafka 等其余部分部署在 minikube 中。为什么描述中database.hostname说 minikube 而不是 Docker?

无论如何,当我跑步时minikube ip,我得到了192.168.49.2。但是,在我更改database.hostname192.168.49.2, 并运行后kubectl get kctr inventory-connector -o yaml -n kafka,我得到了

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"192.168.49.2","database.password":"","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
  creationTimestamp: "2021-09-29T18:20:11Z"
  generation: 1
  labels:
    strimzi.io/cluster: my-connect-cluster
  name: inventory-connector
  namespace: kafka
  resourceVersion: "12777"
  uid: 083df9a3-83ce-4170-a9bc-9573dafdb286
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  config:
    database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
    database.history.kafka.topic: schema-changes.inventory
    database.hostname: 192.168.49.2
    database.password: ""
    database.port: "3306"
    database.server.id: "184054"
    database.server.name: dbserver1
    database.user: ""
    database.whitelist: inventory
    include.schema.changes: "true"
  tasksMax: 1
status:
  conditions:
  - lastTransitionTime: "2021-09-29T18:20:11.548Z"
    message: |-
      PUT /connectors/inventory-connector/config returned 400 (Bad Request): Connector configuration is invalid and contains the following 1 error(s):
      A value is required
      You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
    reason: ConnectRestException
    status: "True"
    type: NotReady
  observedGeneration: 1

我可以访问 MySQL,localhost因为它托管在 Docker 中。但是,当我更改database.hostnamelocalhost.

任何的想法?谢谢!

4

1 回答 1

0

该问题与 minikube 中的服务无法与 docker 中的 MySQL 通信有关。

关于如何从 Kubernetes 集群内部访问主机localhost,我发现了 How to access host's localhost from inside kubernetes cluster

但是,我最终在 Kubernetes 方向上部署 MySQL

kubectl apply -f https://k8s.io/examples/application/mysql/mysql-pv.yaml
kubectl apply -f https://k8s.io/examples/application/mysql/mysql-deployment.yaml

(复制自https://kubernetes.io/docs/tasks/run-application/run-single-instance-stateful-application/

database.hostname: "mysql.default" # service `mysql` in namespace `default`
database.port: "3306"
database.user: "root"
database.password: "password"

现在当我跑

kubectl -n kafka get kctr inventory-connector -o yaml

我收到一个新错误,说 MySQL 没有启用行级 binlog,但是,这意味着它现在可以连接 MySQL。

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"mysql.default","database.password":"password","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"root","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
  creationTimestamp: "2021-09-29T19:36:52Z"
  generation: 1
  labels:
    strimzi.io/cluster: my-connect-cluster
  name: inventory-connector
  namespace: kafka
  resourceVersion: "2918"
  uid: 48bb46e1-42bb-4574-a3dc-221ae7d6a803
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  config:
    database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
    database.history.kafka.topic: schema-changes.inventory
    database.hostname: mysql.default
    database.password: password
    database.port: "3306"
    database.server.id: "184054"
    database.server.name: dbserver1
    database.user: root
    database.whitelist: inventory
    include.schema.changes: "true"
  tasksMax: 1
status:
  conditions:
  - lastTransitionTime: "2021-09-29T19:36:53.605Z"
    status: "True"
    type: Ready
  connectorStatus:
    connector:
      state: UNASSIGNED
      worker_id: 172.17.0.8:8083
    name: inventory-connector
    tasks:
    - id: 0
      state: FAILED
      trace: "org.apache.kafka.connect.errors.ConnectException: The MySQL server is
        not configured to use a row-level binlog, which is required for this connector
        to work properly. Change the MySQL configuration to use a row-level binlog
        and restart the connector.\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:207)\n\tat
        io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:49)\n\tat
        org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)\n\tat
        org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat
        org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat
        java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
        java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
        java.lang.Thread.run(Thread.java:748)\n"
      worker_id: 172.17.0.8:8083
    type: source
  observedGeneration: 1
于 2021-09-29T19:49:16.837 回答