我正在使用 fastavro schemaless_reader 反序列化来自 Kafka 主题的 avro 消息。我注意到一个schema_latest
与上一条消息不兼容的问题,这不应该是这种情况,因为schema_latest
它只在以前的模式中添加了 2 个字段。
而且我可以使用schema_old
.
您能否帮助建议如何使用 fastavro 处理向后兼容的 avro 架构更改?非常感谢您的帮助!
schema_latest = {
"type":"record",
"name":"Test",
"namespace":"Test",
"fields":[
{
"name":"ActualDate",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"OriginalGroupId",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"RelatedPositionId",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"RelatedTradeId",
"type":[
"null",
"long"
],
"default":null
}
]
}
schema_old = {
"type":"record",
"name":"Test",
"namespace":"Test",
"fields":[
{
"name":"ActualDate",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"OriginalGroupId",
"type":[
"null",
"string"
],
"default":null
}
]
}
message = io.BytesIO(b"some byte message")
def deserialize_avro(stringio, s):
# Skip magic byte and 4-byte schema ID
stringio.seek(5)
try:
security = fastavro.schemaless_reader(stringio, s)
print(security)
except Exception as e:
print(e)
if __name__ == '__main__':
deserialize_avro(message, schema_latest)