我正在使用 Confluent Kafka 1.7.0、avro-python3 1.10.0 和 fastavro 1.4.1 运行 Python 3.9。
以下代码使用 Avro 模式编码器来对消息进行编码,只有当我们通过去掉 来转换生成的模式编码时才会成功MappingProxyType
:
from confluent_kafka import Producer
from confluent_kafka.avro import CachedSchemaRegistryClient, MessageSerializer
from fastavro.schema import parse_schema
from fastavro.validation import validate
from types import MappingProxyType
from typing import Any
import sys
def transformMap(item: Any) -> Any:
if type(item) in {dict, MappingProxyType}:
return {k:transformMap(v) for k,v in item.items()}
elif type(item) is list:
return [transformMap(v) for v in item]
else:
return item
def main(argv = None):
msgType = 'InstrumentIdMsg'
idFigi = 'BBG123456789'
head = {'sDateTime': 1, 'msgType': msgType, 'srcSeq': 1,
'rDateTime': 1, 'src': 'Brownstone', 'reqID': None,
'sequence': 1}
msgMap = {'head': head, 'product': 'Port', 'idIsin': None, 'idFigi': idFigi,
'idBB': None, 'benchmark': None, 'idCusip': None,'idCins': None}
registryClient = CachedSchemaRegistryClient(url = 'http://local.KafkaRegistry.com:8081')
schemaId, schema, version = registryClient.get_latest_schema(msgType)
serializer = MessageSerializer(registry_client = registryClient)
schemaMap = schema.to_json()
# NOTE:
# schemaMap cannot be used since it uses mappingproxy
# which causes validate() and parse_schema() to throw
schemaDict = transformMap(schemaMap)
isValid = validate(datum = msgMap, schema = schemaDict, raise_errors = True)
parsed_schema = parse_schema(schema = schemaDict)
msg = serializer.encode_record_with_schema_id(schema_id = schemaId,
record = msgMap)
producer = Producer({'bootstrap.servers': 'kafkaServer:9092'})
producer.produce(key = idFigi,
topic = 'TOPIC_NAME',
value = msg)
return 0
if __name__ == '__main__':
sys.exit(main())
MappingProxyType
除了更改为dict
实例之外,转换基本上保持一切不变。
- 我调用标准库的方式是否存在问题导致使用映射代理,进而导致
fastavro
抛出?这可以由用户修复吗,或者这真的是 Confluent Kafka 库中的错误吗? - 此外,输出
schemaId
fromregistryClient.get_latest_schema()
在文档中标记为 returnstr
但返回int
。schema_id
如果我理解正确,这是对参数的预期输入serializer.encode_record_with_schema_id()
(如果我调用它,它可以正常工作),它也被标记为int
. 这是文档中的错字吗?换句话说,它似乎registryClient.get_latest_schema()
应该返回一个整数,或者serializer.encode_record_with_schema_id()
应该接受一个字符串,或者我做错了什么:) 它是哪一个?
非常感谢。