0

我在 kafka 主题中注册了一个 AVRO 模式,并试图向它发送数据。该架构具有嵌套记录,我不确定如何使用 confluent_kafka python 正确地将数据发送给它。

示例架构: *ingore 架构中的任何拼写错误(真实的非常大,只是一个示例)

 {
 "namespace": "company__name",
 "name": "our_data",
 "type": "record",
 "fields": [
           {
            "name": "datatype1",
            "type": ["null", {
                 "type": "record",
                 "name": "datatype1_1",
                 "fields": [ 
                     {"name": "site", "type": "string"},
                     {"name": "units", "type": "string"}
                  ]
             }]
             "default": null
            }
            {
            "name": "datatype2",
            "type": ["null", {
                 "type": "record",
                 "name": "datatype2_1",
                 "fields": [ 
                     {"name": "site", "type": "string"},
                     {"name": "units", "type": "string"}
                  ]
             }]
             "default": null
            }
           ]
          }

我正在尝试使用 confluent_kafka python 版本将数据发送到此模式。当我之前完成此操作时,记录没有嵌套,我将使用典型的字典key: value对并将其序列化。如何发送嵌套数据以使用模式。

到目前为止我尝试过的...

message = {'datatype1': 
            {'site': 'sitename',
             'units': 'm'
            }
           }

此版本不会导致任何 kafka 错误,但所有列都显示为 null

和...

message = {'datatype1': 
            {'datatype1_1':
              {'site': 'sitename',
               'units': 'm'
              }
            }
           }

此版本在架构中产生了 kafka 错误。

4

1 回答 1

0

如果使用命名空间,则不必担心命名冲突,并且可以正确构建可选记录:例如,两者

{
  "meta": {
    "instanceID": "something"
  }
}

{}

是以下的有效实例:

{
  "doc": "Survey",
  "name": "Survey",
  "type": "record",
  "fields": [
    {
      "name": "meta",
      "type": [
        "null",
        {
          "name": "meta",
          "type": "record",
          "fields": [
            {
              "name": "instanceID",
              "type": [
                "null",
                "string"
              ],
              "namespace": "Survey.meta"
            }
          ],
          "namespace": "Survey"
        }
      ],
      "namespace": "Survey"
    }
  ]
}
于 2021-01-19T18:21:34.843 回答