0

我需要为以下数据编写 Avro 模式。曝光是一组具有 3 个数字的数组。

{
"Response": {
    "status": "",
    "responseDetail": {
        "request_id": "Z618978.R",
        "exposure": [
            [
                372,
                20000000.0,
                31567227140.238808
            ]
            [
                373,
                480000000.0,
                96567227140.238808
            ]
            [
                374,
                23300000.0,
                251567627149.238808
            ]
        ],
        "product": "ABC",
    }
}
}

所以我想出了一个如下的模式:

{
"name": "Response",
"type":{
    "name": "algoResponseType",
    "type": "record",
    "fields":
    [
            {"name": "status", "type": ["null","string"]},
            {
            "name": "responseDetail",
            "type": {
                    "name": "responseDetailType",
                    "type": "record",
                    "fields":
                    [
                            {"name": "request_id", "type": "string"},
                            {
                            "name": "exposure",
                            "type": {
                                    "type": "array",
                                    "items":
                                    {
                                    "name": "single_exposure",
                                    "type": {
                                            "type": "array",
                                            "items": "string"
                                    }
                                    }
                            }
                            },
                            {"name": "product", "type": ["null","string"]}
                    ]
            }
            }
    ]
   }
}

当我尝试注册架构时。我收到以下错误。TypeError: unhashable type: 'dict' 这意味着我使用列表作为字典键。

Traceback (most recent call last):
  File "sa_publisher_main4test.py", line 28, in <module>
    schema_registry_client)
  File "/usr/local/lib64/python3.6/site-packages/confluent_kafka/schema_registry/avro.py", line 175, in __init__
    parsed_schema = parse_schema(schema_dict)
  File "fastavro/_schema.pyx", line 71, in fastavro._schema.parse_schema
  File "fastavro/_schema.pyx", line 204, in fastavro._schema._parse_schema
TypeError: unhashable type: 'dict'

任何人都可以帮助指出导致错误的原因吗?

4

2 回答 2

1

您收到的错误是因为架构注册表不接受您的架构。您的顶部元素必须是带有“响应”字段的记录。

这个模式应该可以工作,我改变了数组项类型,因为在你的消息中你有浮点数而不是字符串。

{
    "type": "record",
    "name": "yourMessage",
    "fields": [
        {
            "name": "Response",
            "type": {
                "name": "AlgoResponseType",
                "type": "record",
                "fields": [
                    {
                        "name": "status",
                        "type": [
                            "null",
                            "string"
                        ]
                    },
                    {
                        "name": "responseDetail",
                        "type": {
                            "name": "ResponseDetailType",
                            "type": "record",
                            "fields": [
                                {
                                    "name": "request_id",
                                    "type": "string"
                                },
                                {
                                    "name": "exposure",
                                    "type": {
                                        "type": "array",
                                        "items": {
                                            "type": "array",
                                            "items": "float"
                                        }
                                    }
                                },
                                {
                                    "name": "product",
                                    "type": [
                                        "null",
                                        "string"
                                    ]
                                }
                            ]
                        }
                    }
                ]
            }
        }
    ]
}

您的消息不正确,因为数组元素之间应该有逗号。

{
    "Response": {
        "status": "",
        "responseDetail": {
            "request_id": "Z618978.R",
            "exposure": [
                [
                    372,
                    20000000.0,
                    31567227140.238808
                ],
                [
                    373,
                    480000000.0,
                    96567227140.238808
                ],
                [
                    374,
                    23300000.0,
                    251567627149.238808
                ]
            ],
            "product": "ABC",
        }
    }
}

当您使用 fastavro 时,我建议您运行此代码来检查您的消息是否是模式的示例。

from fastavro.validation import validate
import json

with open('schema.avsc', 'r') as schema_file:
    schema = json.loads(schema_file.read())

message = {
    "Response": {
        "status": "",
        "responseDetail": {
            "request_id": "Z618978.R",
            "exposure": [
                [
                    372,
                    20000000.0,
                    31567227140.238808
                ],
                [
                    373,
                    480000000.0,
                    96567227140.238808
                ],
                [
                    374,
                    23300000.0,
                    251567627149.238808
                ]
            ],
            "product": "ABC",
        }
    }
}

try:
    validate(message, schema)
    print('Message is matching schema')
except Exception as ex:
    print(ex)
于 2020-08-21T19:52:10.873 回答
1

有几个问题。

首先,在架构的最顶层,您拥有以下内容:

{
  "name": "Response",
  "type": {...}
}

但这是不对的。顶层应该是一个记录类型,其中包含一个名为 的字段Response。所以它应该是这样的:

{
  "name": "Response",
  "type": "record",
  "fields": [
    {
      "name": "Response",
      "type": {...}
    }
  ]
}

第二个问题是,对于数组数组,您目前有以下内容:

{
   "name":"exposure",
   "type":{
      "type":"array",
      "items":{
        "name":"single_exposure",
        "type":{
          "type":"array",
          "items":"string"
        }
     }
   }
}

但它应该看起来像这样:

{
   "name":"exposure",
   "type":{
      "type":"array",
      "items":{
        "type":"array",
        "items":"string"
     }
   }
}

修复这些之后,架构将能够被解析,但是您的数据包含一个浮点数组,并且您的架构说它应该是一个字符串数组的数组。因此,要么架构需要更改为浮动,要么数据需要是字符串。

作为参考,这是一个在解决这些问题后可以工作的示例脚本:

import fastavro

s = {
   "name":"Response",
   "type":"record",
   "fields":[
      {
         "name":"Response",
         "type": {
            "name":"algoResponseType",
            "type":"record",
            "fields":[
               {
                  "name":"status",
                  "type":[
                     "null",
                     "string"
                  ]
               },
               {
                  "name":"responseDetail",
                  "type":{
                     "name":"responseDetailType",
                     "type":"record",
                     "fields":[
                        {
                           "name":"request_id",
                           "type":"string"
                        },
                        {
                           "name":"exposure",
                           "type":{
                              "type":"array",
                              "items":{
                                "type":"array",
                                "items":"string"
                             }
                           }
                        },
                        {
                           "name":"product",
                           "type":[
                              "null",
                              "string"
                           ]
                        }
                     ]
                  }
               }
            ]
         }
      }
   ]
}

data = {
   "Response":{
      "status":"",
      "responseDetail":{
         "request_id":"Z618978.R",
         "exposure":[
            [
               "372",
               "20000000.0",
               "31567227140.238808"
            ],
            [
               "373",
               "480000000.0",
               "96567227140.238808"
            ],
            [
               "374",
               "23300000.0",
               "251567627149.238808"
            ]
         ],
         "product":"ABC"
      }
   }
}

parsed_schema = fastavro.parse_schema(s)
fastavro.validate(data, parsed_schema)
于 2020-08-21T20:09:13.890 回答