安装了 srimzi
helm repo add strimzi https://strimzi.io/charts/ && helm install strimzi-kafka strimzi/strimzi-kafka-operator
输出:
Name: strimzi-cluster-operator-587cb79468-hrs9q
strimzi-cluster-operator with (quay.io/strimzi/operator:0.28.0)
(strimzi-kafka-connect:0.28) 我曾经使用以下 Dockerfile 构建图像
FROM quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./debezium-connector-mysql/ /opt/kafka/plugins/debezium/
COPY ./debezium-connector-mongodb/ /opt/kafka/plugins/debezium/
COPY ./confluentinc-kafka-connect-elasticsearch/ /opt/kafka/plugins/debezium/
COPY ./mongodb-kafka-connect-mongodb-1.7.0/ /opt/kafka/plugins/debezium/
RUN chown -R kafka:root /opt/kafka
USER 1001
以下是 KafkaConnect 配置:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
spec:
version: 3.1.0
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
image: strimzi-kafka-connect:0.28.2
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
使用https://github.com/strimzi/strimzi-kafka-operator/releases/tag/0.28.0安装 Kafka
具体(kafka-persistent.yaml)具有以下配置:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.1.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: "3.1"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
deleteClaim: false
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 10Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
以下是插件的状态:
kubectl exec my-connect-cluster-connect-59cfff997b-4kv9b -it my-connect-cluster-connect -- curl http://localhost:8083/connector-plugins | jq '.'
[
{
"class": "com.mongodb.kafka.connect.MongoSinkConnector",
"type": "sink",
"version": "1.7.0"
},
{
"class": "com.mongodb.kafka.connect.MongoSourceConnector",
"type": "source",
"version": "1.7.0"
},
{
"class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type": "sink",
"version": "11.1.8"
},
{
"class": "io.debezium.connector.mongodb.MongoDbConnector",
"type": "source",
"version": "1.8.1.Final"
},
{
"class": "io.debezium.connector.mysql.MySqlConnector",
"type": "source",
"version": "1.0.0.Final"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "3.1.0"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "3.1.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
当我为 mysql 创建连接器时,一切正常。
[kafka@my-connect-cluster-connect-59cfff997b-4kv9b kafka]$ curl -i -X POST -H "Accept:application/json" \
> -H "Content-Type:application/json" http://localhost:8083/connectors/ \
> -d '{
> "name": "inventory-test-mysql",
> "config": {
> "connector.class": "io.debezium.connector.mysql.MySqlConnector",
> "tasks.max": "1",
> "database.hostname": "172.17.0.7",
> "database.port": "3306",
> "database.user": "root",
> "database.password": "debezium",
> "database.server.id": "184054",
> "database.server.name": "dbserver1",
> "database.include.list": "inventory",
> "database.history.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
> "database.history.kafka.topic": "schema-changes-for-inventory",
> "include.schema.changes": "true"
> }
> }'
HTTP/1.1 201 Created
Date: Tue, 22 Feb 2022 11:50:14 GMT
Location: http://localhost:8083/connectors/inventory-test-mysql
Content-Type: application/json
Content-Length: 560
Server: Jetty(9.4.43.v20210629)
{"name":"inventory-test-mysql","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"172.17.0.7","database.port":"3306","database.user":"root","database.password":"debezium","database.server.id":"184054","database.server.name":"dbserver1","database.include.list":"inventory","database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes-for-inventory","include.schema.changes":"true","name":"inventory-test-mysql"},"tasks":[],"type":"source"}
主题和连接器创建成功,输出如下:
➜ ~ kubectl exec --tty -i kafka-client-strimzi -- bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --list
__consumer_offsets
__strimzi-topic-operator-kstreams-topic-store-changelog
__strimzi_store_topic
connect-cluster-configs
connect-cluster-offsets
connect-cluster-status
dbserver1
dbserver1.inventory.addresses
dbserver1.inventory.customers
dbserver1.inventory.geom
dbserver1.inventory.orders
dbserver1.inventory.products
dbserver1.inventory.products_on_hand
schema-changes-customers
schema-changes-for-inventory
连接器状态:
[kafka@my-connect-cluster-connect-59cfff997b-4kv9b kafka]$ curl http://localhost:8083/connectors
["inventory-test-mysql"]
[kafka@my-connect-cluster-connect-59cfff997b-4kv9b kafka]$ curl http://localhost:8083/connectors/inventory-test-mysql/status
{"name":"inventory-test-mysql","connector":{"state":"RUNNING","worker_id":"172.17.0.19:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.17.0.19:8083"}],"type":"source"}
每当我在库存数据库表中添加或更新记录时,我都会收到架构更改。
问题:
整个设置都不适用
{
"class": "com.mongodb.kafka.connect.MongoSinkConnector",
"type": "sink",
"version": "1.7.0"
},
{
"class": "com.mongodb.kafka.connect.MongoSourceConnector",
"type": "source",
"version": "1.7.0"
}
或者
{
"class": "io.debezium.connector.mongodb.MongoDbConnector",
"type": "source",
"version": "1.8.1.Final"
}
这是 curl 的输出:
[kafka@my-connect-cluster-connect-59cfff997b-4kv9b kafka]$ curl -i -X POST -H "Accept:application/json" \
> -H "Content-Type:application/json" http://localhost:8083/connectors/ \
> -d '{
> "name": "mongodb-connector",
> "config": {
> "tasks.max":1,
> "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
> "connection.uri":"mongodb://root:password@172.17.0.20:27017",
> "key.converter":"org.apache.kafka.connect.storage.StringConverter",
> "value.converter":"org.apache.kafka.connect.storage.StringConverter",
> "key.converter.schemas.enable": "false",
> "value.converter.schemas.enable": "false",
> "database":"mydb",
> "collection":"dataSource"
> }
> }'
HTTP/1.1 500 Server Error
Cache-Control: must-revalidate,no-cache,no-store
Content-Type: application/json
Content-Length: 137
Connection: close
Server: Jetty(9.4.43.v20210629)
{
"servlet":"org.glassfish.jersey.servlet.ServletContainer-1d98daa0",
"message":"Request failed.",
"url":"/connectors/",
"status":"500"
}
或与
[kafka@my-connect-cluster-connect-59cfff997b-4kv9b kafka]$ curl -i -X POST -H "Accept:application/json" \
> -H "Content-Type:application/json" http://localhost:8083/connectors/ \
> -d '{
> "name": "mongodb-connector",
> "config": {
> "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
> "mongodb.hosts": "rs0/172.17.0.20:27017,rs0/172.17.0.21:27017",
> "mongodb.name": "mydb",
> "mongodb.user": "root",
> "mongodb.password": "password",
> "database.whitelist": "mydb[.]*"
> }
> }'
HTTP/1.1 500 Server Error
Cache-Control: must-revalidate,no-cache,no-store
Content-Type: application/json
Content-Length: 137
Connection: close
Server: Jetty(9.4.43.v20210629)
{
"servlet":"org.glassfish.jersey.servlet.ServletContainer-1d98daa0",
"message":"Request failed.",
"url":"/connectors/",
"status":"500"
}
如果你让我知道我错过了什么,我将不胜感激。
如果使用 Strimzi 和 Mongodb 无法实现 REST,那么还有什么替代方法?
它与 Strimzi-Kafka 的版本有关吗?它与Mongodb插件的版本有关吗?
谢谢