2

我正在尝试将此消息发布到 Kafka Rest:

{
  "key_schema": "[\"null\",\"long\"]",
  "value_schema": "{\"type\":\"record\",\"name\":\"GVEDGE_FI_INVOICE_TX_INFO_V2\",\"namespace\":\"com.vistajet.gvedge.infrastructure.kafka.connector\",\"fields\":[{\"name\":\"XID\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"ID\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"INVOICE_CHANGE_COUNT\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"INVOICE_DETAIL_CHANGE_COUNT\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"ADDITIONAL_INVL_CHANGE_COUNT\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"COST_ALLOCATION_CHANGE_COUNT\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"com.vistajet.gvedge.infrastructure.kafka.connector.GVEDGE_FI_INVOICE_TX_INFO_V2\"}",
  "records": [
    {
      "key": {
        "long": 140766
      },
      "value": {
        "XID": {
          "long": 29524623
        },
        "ID": {
          "long": 140766
        },
        "INVOICE_CHANGE_COUNT": {
          "long": 1
        },
        "INVOICE_DETAIL_CHANGE_COUNT": {
          "long": 0
        },
        "ADDITIONAL_INVL_CHANGE_COUNT": {
          "long": 0
        },
        "COST_ALLOCATION_CHANGE_COUNT": {
          "long": 0
        }
      }
    }
  ]
}

我在 rest-proxy.log 中有以下错误:

[2019-04-11 14:25:58,991] 错误未处理的异常导致内部服务器错误响应 (io.confluent.rest.exceptions.GenericExceptionMapper:38) org.apache.kafka.common.errors.SerializationException:注册 Avro 架构时出错:“long” 原因:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:正在注册的模式与早期模式不兼容;错误代码:409

为什么会这样???我不明白为什么

"key": {
        "long": 140766
      }

是否正在触发新模式的创建?...

顺便提一句

"key": null

工作正常。

4

1 回答 1

1

您正在将键值设置为对象。

"key": {
        "long": 140766
      }

这不反映您的架构:

"key_schema": "[\"null\",\"long\"]",

尝试这个:

{
  "key_schema": "{\"type\":\"long\"}",
  "value_schema": "{\"type\":\"record\",\"name\":\"GVEDGE_FI_INVOICE_TX_INFO_V2\",\"namespace\":\"com.vistajet.gvedge.infrastructure.kafka.connector\",\"fields\":[{\"name\":\"XID\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"ID\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"INVOICE_CHANGE_COUNT\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"INVOICE_DETAIL_CHANGE_COUNT\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"ADDITIONAL_INVL_CHANGE_COUNT\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"COST_ALLOCATION_CHANGE_COUNT\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"com.vistajet.gvedge.infrastructure.kafka.connector.GVEDGE_FI_INVOICE_TX_INFO_V2\"}",
  "records": [
    {
      "key": 234,
      "value": {
        "XID": {
          "long": 29524623
        },
        "ID": {
          "long": 140766
        },
        "INVOICE_CHANGE_COUNT": {
          "long": 1
        },
        "INVOICE_DETAIL_CHANGE_COUNT": {
          "long": 0
        },
        "ADDITIONAL_INVL_CHANGE_COUNT": {
          "long": 0
        },
        "COST_ALLOCATION_CHANGE_COUNT": {
          "long": 0
        }
      }
    }
  ]
}

现在您的密钥是 long 类型的。因此,您只能在输入数据的旁边传递数字。

但是,如果您想将键值保留为对象,就像您在上面的示例中一样,那么您还需要将键定义为记录类型:

{
  "key_schema": "{\"type\":\"record\",\"name\":\"GVEDGE_FI_INVOICE_TX_INFO_V2_KEY\",\"namespace\":\"com.vistajet.gvedge.infrastructure.kafka.connector\",\"fields\":[{\"name\":\"long\",\"type\":[\"null\",\"long\"]}]}",
  "value_schema": "{\"type\":\"record\",\"name\":\"GVEDGE_FI_INVOICE_TX_INFO_V2\",\"namespace\":\"com.vistajet.gvedge.infrastructure.kafka.connector\",\"fields\":[{\"name\":\"XID\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"ID\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"INVOICE_CHANGE_COUNT\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"INVOICE_DETAIL_CHANGE_COUNT\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"ADDITIONAL_INVL_CHANGE_COUNT\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"COST_ALLOCATION_CHANGE_COUNT\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"com.vistajet.gvedge.infrastructure.kafka.connector.GVEDGE_FI_INVOICE_TX_INFO_V2\"}",
  "records": [
    {
      "key": {
          "long": {"long": 7}
       },
      "value": {
        "XID": {
          "long": 29524623
        },
        "ID": {
          "long": 140766
        },
        "INVOICE_CHANGE_COUNT": {
          "long": 1
        },
        "INVOICE_DETAIL_CHANGE_COUNT": {
          "long": 0
        },
        "ADDITIONAL_INVL_CHANGE_COUNT": {
          "long": 0
        },
        "COST_ALLOCATION_CHANGE_COUNT": {
          "long": 0
        }
      }
    }
  ]
}

在此处输入图像描述

现在回到关于“正在注册的模式与早期模式不兼容”的主要问题。

该消息解释了您正在尝试更改架构的自我。但是,您不能更改相同的内容,因为它与您当前版本的架构不兼容。

如果您确定它仅用于开发,那么您现在可以将模式兼容级别更改为“NONE”。

在此处输入图像描述

然后再试一次。

否则,请检查您在以前的密钥架构和新的密钥架构之间所做的更改,以及它不兼容的原因。

于 2020-04-08T13:32:59.793 回答