1

我正在尝试使用“kafka-avro-console-producer”将 json 转换为 avro 并将其发布到 kafka 主题。

我可以做那个扁平的 json/schema,但是对于下面给定的 schema 和 json,我得到“org.apache.avro.AvroTypeException:Unknown union branch EventId”错误。

任何帮助,将不胜感激。

架构:

{
    "type": "record",
    "name": "Envelope",
    "namespace": "CoreOLTPEvents.dbo.Event",
    "fields": [{
        "name": "before",
        "type": ["null", {
            "type": "record",
            "name": "Value",
            "fields": [{
                "name": "EventId",
                "type": "long"
            }, {
                "name": "CameraId",
                "type": ["null", "long"],
                "default": null
            }, {
                "name": "SiteId",
                "type": ["null", "long"],
                "default": null
            }],
            "connect.name": "CoreOLTPEvents.dbo.Event.Value"
        }],
        "default": null
    }, {
        "name": "after",
        "type": ["null", "Value"],
        "default": null
    }, {
        "name": "op",
        "type": "string"
    }, {
        "name": "ts_ms",
        "type": ["null", "long"],
        "default": null
    }],
    "connect.name": "CoreOLTPEvents.dbo.Event.Envelope"
}

Json 输入如下:

{
    "before": null,
    "after": {
        "EventId": 12,
        "CameraId": 10,
        "SiteId": 11974
    },
    "op": "C",
    "ts_ms": null
}

在我的情况下,我不能改变模式,我只能改变 json 这样它的工作方式

4

2 回答 2

2

如果您使用的是 Avro JSON 格式,则您的输入会稍有偏差。对于联合,需要指定非空值,以便列出类型信息:https ://avro.apache.org/docs/current/spec.html#json_encoding

请参阅下面的示例,我认为应该可行。

{
    "before": null,
    "after": {
        "CoreOLTPEvents.dbo.Event.Value": {
            "EventId": 12,
            "CameraId": {
                "long": 10
            },
            "SiteId": {
                "long": 11974
            }
        }
    },
    "op": "C",
    "ts_ms": null
}
于 2021-02-25T13:35:07.917 回答
0

删除"connect.name": "CoreOLTPEvents.dbo.Event.Value""connect.name": "CoreOLTPEvents.dbo.Event.Envelope"作为The RecordType can only contains {'namespace', 'aliases', 'fields', 'name', 'type', 'doc'} keys. 您可以尝试使用以下架构,看看您是否能够产生味精?

{
  "type": "record",
  "name": "Envelope",
  "namespace": "CoreOLTPEvents.dbo.Event",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "EventId",
              "type": "long"
            },
            {
              "name": "CameraId",
              "type": [
                "null",
                "long"
              ],
              "default": "null"
            },
            {
              "name": "SiteId",
              "type": [
                "null",
                "long"
              ],
              "default": "null"
            }
          ]
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    },
    {
      "name": "op",
      "type": "string"
    },
    {
      "name": "ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    }
  ]
}
于 2021-02-25T12:54:04.423 回答