1

我有一个管道流,我将 debezium CDC mysql 连接器从 confluent 平台连接到 Confluent Cloud,因为云内置 debezium mysql 连接器处于预览状态,我已成功建立连接,并且来自主题的消息由 S3 sink 连接器订阅. 最初我有 json 格式的流,但后来我希望它是 AVRO 格式,因此我更改了键和值转换器的连接器配置文件,如下所示:

Debezium 连接器 json:

{
    "name":"mysql_deb3",
    "config":{
       "connector.class":"io.debezium.connector.mysql.MySqlConnector",
       "tasks.max":"1",
       "database.hostname":"host_name",
       "database.port":"3306",
       "database.user":"user_name",
       "database.password":"password",
       "database.server.id":"123456789",
       "database.server.name": "server_name",
       "database.whitelist":"db_name",
       "database.history.kafka.topic":"dbhistory.db_name",
       "include.schema.changes": "true",
       "table.whitelist": "db_name.table_name",
       "tombstones.on.delete": "false",
       "key.converter": "io.confluent.connect.avro.AvroConverter",
       "value.converter": "io.confluent.connect.avro.AvroConverter",
       "key.converter.schema.registry.url": "cloud_schema_registry_endpoint",
       "value.converter.schema.registry.url": "cloud_schema_registry_endpoint",
       "key.converter.schema.registry.basic.auth.user.info":"schema_registry_api_key:schema_registry_api_secret",
       "value.converter.schema.registry.basic.auth.user.info":"schema_registry_api_key:schema_registry_api_secret",
       "decimal.handling.mode": "double",
       "transforms": "unwrap",
       "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
       "transforms.unwrap.drop.tombstones": "true",
       "transforms.unwrap.delete.handling.mode": "rewrite",
"database.history.kafka.bootstrap.servers":"confluent_cloud_kafka_server_endpoint:9092",
"database.history.consumer.security.protocol":"SASL_SSL",
"database.history.consumer.ssl.endpoint.identification.algorithm":"https",
"database.history.consumer.sasl.mechanism":"PLAIN",
"database.history.consumer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"cloud_kafka_api\" password=\"cloud_kafka_api_secret\";",
"database.history.producer.security.protocol":"SASL_SSL",
"database.history.producer.ssl.endpoint.identification.algorithm":"https",
"database.history.producer.sasl.mechanism":"PLAIN",
"database.history.producer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"cloud_kafka_api\" password=\"cloud_kafka_api_secret\";",
    }
 }

################################################# ##################

连接-distributed.properties:

bootstrap.servers=confluent_cloud_kafka_server_endpoint:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="cloud_kafka_api" password="cloud_kafka_api_secret";
request.timeout.ms=20000
retry.backoff.ms=500

producer.bootstrap.servers=confluent_cloud_kafka_server_endpoint:9092
producer.ssl.endpoint.identification.algorithm=https
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="cloud_kafka_api" password="cloud_kafka_api_secret";
producer.request.timeout.ms=20000
producer.retry.backoff.ms=500

consumer.bootstrap.servers=confluent_cloud_kafka_server_endpoint:9092
consumer.ssl.endpoint.identification.algorithm=https
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="cloud_kafka_api" password="cloud_kafka_api_secret";
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500

offset.flush.interval.ms=10000
group.id=connect-cluster
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3

schema.registry.url=https://cloud_schema_registry_endpoint
schema.registry.basic.auth.user.info=<schema_registry_api_key>:<schema_registry_api_secret>

################################################

-- 我通过 --> bin/connect-distributed etc/connect-distributed.properties 启动 kafka 连接

- 连接启动良好,但是当我尝试使用 curl 命令加载 debezium 连接器时,它显示以下错误“未经授权”,但我提供的 api 密钥和秘密是正确的,我也使用 cli 手动检查了它.

引起:org.apache.kafka.connect.errors.DataException: staging-development-rds-cluster at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:78) at org.apache.kafka.connect。 runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:266) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors .RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 11 更多原因:org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":" SchemaChangeKey","namespace":"io.debezium.connector.mysql","fields":[{"name":"databaseName","type":"string"}],"connect.name":"io.debezium.connector.mysql.SchemaChangeKey"} 原因:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; 错误代码: io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235) 在 io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:209) 的 401 .confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:326) 在 io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:318) 在 io.confluent.kafka .schemaregistry.client.rest.RestService.registerSchema(RestService.java:313) 在 io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:119) 在 io。confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:156) 在 io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79) 在 io.confluent.connect.avro.AvroConverter$Serializer。在 org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:266) 处序列化(AvroConverter.java:117) ) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) 在 org. apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:266) 在 org.apache .kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:228) 在 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) 在 org.apache.kafka.connect.runtime.WorkerTask .run(WorkerTask.java:219) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util。 concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) [2020-11 -30 05:30:47,389] 错误 WorkerSourceTask{id=mysql_deb3-0} 任务被杀死,直到手动重新启动才能恢复(org.apache.kafka.connect.runtime.WorkerTask:178)[2020-11-30 05:30:47,389] 信息停止连接器 (io.debezium.connector.common.BaseSourceTask:187) [2020-11-30 05:30:47,389] 信息停止 MySQL 连接器任务 (io.debezium.connector.mysql.MySqlConnectorTask:458)

请大家帮我解决这个问题。提前致谢

4

0 回答 0