3

我正在使用 Confluent Kafka 1.7.0、avro-python3 1.10.0 和 fastavro 1.4.1 运行 Python 3.9。

以下代码使用 Avro 模式编码器来对消息进行编码,只有当我们通过去掉 来转换生成的模式编码时才会成功MappingProxyType

from confluent_kafka      import Producer
from confluent_kafka.avro import CachedSchemaRegistryClient, MessageSerializer

from fastavro.schema import parse_schema
from fastavro.validation import validate

from types  import MappingProxyType
from typing import Any

import sys

def transformMap(item: Any) -> Any:
    if type(item) in {dict, MappingProxyType}:
        return {k:transformMap(v) for k,v in item.items()}
    elif type(item) is list:
        return [transformMap(v) for v in item]
    else:
        return item

def main(argv = None):
    msgType = 'InstrumentIdMsg'
    idFigi  = 'BBG123456789'
    head = {'sDateTime': 1, 'msgType': msgType,      'srcSeq': 1,
            'rDateTime': 1, 'src':     'Brownstone', 'reqID':  None,
            'sequence':  1}
    msgMap = {'head': head, 'product':  'Port', 'idIsin': None, 'idFigi': idFigi,
              'idBB': None, 'benchmark': None,  'idCusip': None,'idCins': None}

    registryClient = CachedSchemaRegistryClient(url = 'http://local.KafkaRegistry.com:8081')
    schemaId, schema, version = registryClient.get_latest_schema(msgType)

    serializer = MessageSerializer(registry_client = registryClient)

    schemaMap = schema.to_json()

    # NOTE:
    # schemaMap cannot be used since it uses mappingproxy
    # which causes validate() and parse_schema() to throw
    schemaDict = transformMap(schemaMap)

    isValid = validate(datum = msgMap, schema = schemaDict, raise_errors = True)
    parsed_schema = parse_schema(schema = schemaDict)

    msg = serializer.encode_record_with_schema_id(schema_id = schemaId,
                                                  record    = msgMap)

    producer = Producer({'bootstrap.servers': 'kafkaServer:9092'})
    producer.produce(key = idFigi,
                     topic = 'TOPIC_NAME',
                     value = msg)
    return 0


if __name__ == '__main__':
    sys.exit(main())

MappingProxyType除了更改为dict实例之外,转换基本上保持一切不变。

  1. 我调用标准库的方式是否存在问题导致使用映射代理,进而导致fastavro抛出?这可以由用户修复吗,或者这真的是 Confluent Kafka 库中的错误吗?
  2. 此外,输出schemaIdfromregistryClient.get_latest_schema()在文档中标记为 returnstr但返回intschema_id如果我理解正确,这是对参数的预期输入serializer.encode_record_with_schema_id()(如果我调用它,它可以正常工作),它也被标记为int. 这是文档中的错字吗?换句话说,它似乎registryClient.get_latest_schema()应该返回一个整数,或者serializer.encode_record_with_schema_id()应该接受一个字符串,或者我做错了什么:) 它是哪一个?

非常感谢。

4

0 回答 0