0

我正在使用 confluent 和 kafka-connect-oracle ( https://github.com/erdemcer/kafka-connect-oracle ) 来跟踪 Oracle 数据库 11g XE 中的更改,我可以通过使用模式注册表 api 来查看模式内容,例如“curl -X GET http://localhost:8081/schemas/ids/44" :

{"subject":"TEST.KAFKAUSER.TEST-value","version":1,"id":44,"schema":"{"type":"record","name":"row","命名空间":"test.kafkauser.test","fields":[{"name":"SCN","type":"long"},{"name":"SEG_OWNER","type":"string" },{"name":"TABLE_NAME","type":"string"},{"name":"TIMESTAMP","type":{"type":"long","connect.version":1, "connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}},{"name":"SQL_REDO","type":"string"}, {“姓名”:”操作","type":"string"},{"name":"data","type":["null",{"type":"record","name":"value","namespace" :"","fields":[{"name":"ID","type":["null","double"],"default":null},{"name":"NAME","type ":["null","string"],"default":null}],"connect.name":"value"}],"default":null},{"name":"before","type ":["null","value"],"default":null}],"connect.name":"test.kafkauser.test.row"}","deleted":false}["null",{"type":"record","name":"value","namespace":"","fields":[{"name":"ID","type":["null ","double"],"default":null},{"name":"NAME","type":["null","string"],"default":null}],"connect.name" :"value"}],"default":null},{"name":"before","type":["null","value"],"default":null}],"connect.name" :"test.kafkauser.test.row"}","已删除":false}["null",{"type":"record","name":"value","namespace":"","fields":[{"name":"ID","type":["null ","double"],"default":null},{"name":"NAME","type":["null","string"],"default":null}],"connect.name" :"value"}],"default":null},{"name":"before","type":["null","value"],"default":null}],"connect.name" :"test.kafkauser.test.row"}","已删除":false}null","double"],"default":null},{"name":"NAME","type":["null","string"],"default":null}],"connect.name ":"value"}],"default":null},{"name":"before","type":["null","value"],"default":null}],"connect.name ":"test.kafkauser.test.row"}","deleted":false}null","double"],"default":null},{"name":"NAME","type":["null","string"],"default":null}],"connect.name ":"value"}],"default":null},{"name":"before","type":["null","value"],"default":null}],"connect.name ":"test.kafkauser.test.row"}","deleted":false}kafkauser.test.row"}","已删除":false}kafkauser.test.row"}","已删除":false}

然而,这个模式不能被 python 中的 confluent 模式注册表解析:

schemaRegistryClientURL="http://localhost:8081"
from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer
schema_registry_client= CachedSchemaRegistryClient(url=schemaRegistryClientURL)
schema_registry_client.get_by_id(44)

我收到以下错误:

回溯(最后一次调用):文件“”,第 1 行,在文件“build/bdist.linux-x86_64/egg/confluent/schemaregistry/client/CachedSchemaRegistryClient.py”中,第 140 行,在 get_by_id confluent.schemaregistry.client 中。 ClientError:从注册表收到错误的架构。

kafka-connect-oracle 是否将无效模式发送到模式注册表?如何将此模式转换为正确的格式?

谢谢。

4

1 回答 1

1

看起来您的架构有问题。JSON 格式化程序说它是无效的格式。您可以在此处检查您的 JSON 格式是否正确:https ://jsonformatter.curiousconcept.com/#

通过查看它,我看到这里有 2 个过度使用的引号:

第一个在第一排,之后"schema":

第二个在最后一行,介于test.row"},"deleted":false}

删除这两个后,它现在是有效的形式。如果您正在询问一种自动执行此操作的方法,我不知道如何执行此操作。也许您可以搜索一些 python 代码来验证和修复 JSON 格式。

这是有效的格式:

{
   "subject":"TEST.KAFKAUSER.TEST-value",
   "version":1,
   "id":44,
   "schema":{
      "type":"record",
      "name":"row",
      "namespace":"test.kafkauser.test",
      "fields":[
         {
            "name":"SCN",
            "type":"long"
         },
         {
            "name":"SEG_OWNER",
            "type":"string"
         },
         {
            "name":"TABLE_NAME",
            "type":"string"
         },
         {
            "name":"TIMESTAMP",
            "type":{
               "type":"long",
               "connect.version":1,
               "connect.name":"org.apache.kafka.connect.data.Timestamp",
               "logicalType":"timestamp-millis"
            }
         },
         {
            "name":"SQL_REDO",
            "type":"string"
         },
         {
            "name":"OPERATION",
            "type":"string"
         },
         {
            "name":"data",
            "type":[
               "null",
               {
                  "type":"record",
                  "name":"value",
                  "namespace":"",
                  "fields":[
                     {
                        "name":"ID",
                        "type":[
                           "null",
                           "double"
                        ],
                        "default":null
                     },
                     {
                        "name":"NAME",
                        "type":[
                           "null",
                           "string"
                        ],
                        "default":null
                     }
                  ],
                  "connect.name":"value"
               }
            ],
            "default":null
         },
         {
            "name":"before",
            "type":[
               "null",
               "value"
            ],
            "default":null
         }
      ],
      "connect.name":"test.kafkauser.test.row"
   },
   "deleted":false
}
于 2021-11-01T09:08:52.937 回答