0

当我尝试使用 debezium 将数据加载到 Kafka 主题中时,我使用 debezium 为 CDC 设置了 Kafka、Kafka-connect 集群和胶水模式注册表,它工作正常,但接收器连接器失败,并出现以下插入错误从 Kafka 主题到 S3 存储桶的数据。

例如:连接器配置:

{
  "name": "s3-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.bucket.name": "somebucketname",
    "name": "s3-sink-connector",
    "tasks.max": "1",
    "s3.region": "ap-south-1",
    "topics":"second_mongo_conn.chakra_dev.xiomileads",
    "timezone": "UTC",
    "locale": "en",
    "flush.size": "2",
    "rotate.interval.ms": "3600",
    "key.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter",
    "key.converter.schemas.enable": "true",
    "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "internal.key.converter.schemas.enable": "false",
    "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "internal.value.converter.schemas.enable": "false",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter",
    "value.converter.schemas.enable": "true",
    "key.converter.schemaAutoRegistrationEnabled": "true",
    "value.converter.schemaAutoRegistrationEnabled": "true",
    "key.converter.avroRecordType": "GENERIC_RECORD",
    "value.converter.avroRecordType": "GENERIC_RECORD",
    "value.converter.dataFormat": "JSON",
    "value.converter.region": "ap-south-1",
    "key.converter.region": "ap-south-1",
    "key.converter.registry.name": "sampleregistry",
    "key.converter.dataFormat": "JSON", 
    "value.converter.registry.name": "sampleregistry",
    "key.converter.schemaName":"second_mongo_conn.chakra_dev.xiomileads",
    "value.converter.schemaName":"second_mongo_conn.chakra_dev.xiomileads"
  }
}

我收到以下错误:

错误转换主题“second_mongo_conn.chakra_dev.xiomileads”分区 0 中偏移量 0 和时间戳 1632977820814 中的消息值:由于序列化错误,将字节 [] 转换为 Kafka Connect 数据失败:(org.apache.kafka.connect.runtime.WorkerSinkTask:542 ) org.apache.kafka.connect.errors.DataException:由于序列化错误,将字节 [] 转换为 Kafka Connect 数据失败:在 com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:165)

.. .. 引起:com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException:无法在 com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.getSchemaVersionResponse( AWSSchemaRegistryClient.java:226) .. .. 引起:software.amazon.awssdk.services.glue.model.AccessDeniedException:用户:arn:aws:sts::409503125457:assumed-role/smtp_role/i-0e4e544391277a376 未授权执行:glue:GetSchemaVersion(服务:Glue,状态代码:400,请求 ID:7fe5f384-f9a7-442c-824b-00761e729730,扩展请求 ID:null)

4

0 回答 0