我在 kafka 主题中注册了一个 AVRO 模式,并试图向它发送数据。该架构具有嵌套记录,我不确定如何使用 confluent_kafka python 正确地将数据发送给它。
示例架构: *ingore 架构中的任何拼写错误(真实的非常大,只是一个示例)
{
"namespace": "company__name",
"name": "our_data",
"type": "record",
"fields": [
{
"name": "datatype1",
"type": ["null", {
"type": "record",
"name": "datatype1_1",
"fields": [
{"name": "site", "type": "string"},
{"name": "units", "type": "string"}
]
}]
"default": null
}
{
"name": "datatype2",
"type": ["null", {
"type": "record",
"name": "datatype2_1",
"fields": [
{"name": "site", "type": "string"},
{"name": "units", "type": "string"}
]
}]
"default": null
}
]
}
我正在尝试使用 confluent_kafka python 版本将数据发送到此模式。当我之前完成此操作时,记录没有嵌套,我将使用典型的字典key: value
对并将其序列化。如何发送嵌套数据以使用模式。
到目前为止我尝试过的...
message = {'datatype1':
{'site': 'sitename',
'units': 'm'
}
}
此版本不会导致任何 kafka 错误,但所有列都显示为 null
和...
message = {'datatype1':
{'datatype1_1':
{'site': 'sitename',
'units': 'm'
}
}
}
此版本在架构中产生了 kafka 错误。