0

我正在使用 fastavro schemaless_reader 反序列化来自 Kafka 主题的 avro 消息。我注意到一个schema_latest与上一条消息不兼容的问题,这不应该是这种情况,因为schema_latest它只在以前的模式中添加了 2 个字段。

而且我可以使用schema_old.

您能否帮助建议如何使用 fastavro 处理向后兼容的 avro 架构更改?非常感谢您的帮助!


        schema_latest = {
       "type":"record",
       "name":"Test",
       "namespace":"Test",
       "fields":[
          {
             "name":"ActualDate",
             "type":[
                "null",
                "int"
             ],
             "default":null
          },
          {
             "name":"OriginalGroupId",
             "type":[
                "null",
                "string"
             ],
             "default":null
          },
          {
             "name":"RelatedPositionId",
             "type":[
                "null",
                "int"
             ],
             "default":null
          },
          {
             "name":"RelatedTradeId",
             "type":[
                "null",
                "long"
             ],
             "default":null
          }
       ]
    }
    
      schema_old = {
       "type":"record",
       "name":"Test",
       "namespace":"Test",
       "fields":[
          {
             "name":"ActualDate",
             "type":[
                "null",
                "int"
             ],
             "default":null
          },
          {
             "name":"OriginalGroupId",
             "type":[
                "null",
                "string"
             ],
             "default":null
          }
       ]
    }
        
        message = io.BytesIO(b"some byte message")
        
        
        def deserialize_avro(stringio, s):
            # Skip magic byte and 4-byte schema ID
            stringio.seek(5)
            try:
                security = fastavro.schemaless_reader(stringio, s)
                print(security)
        
            except Exception as e:
                print(e)
        
        
        if __name__ == '__main__':
            deserialize_avro(message, schema_latest)

4

0 回答 0