问题标签 [fastavro]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 处理位于不同目录中并使用 python (fastavro) 相互引用的多个 AVRO (avsc 文件)
我在不同的目录中有两个 avsc 文件-
这是地址文件
这是客户文件,它是父(顶级)架构
当两个文件位于同一目录中时,我可以使用 fastavro 加载文件
但是当两个 avsc 文件位于不同的目录中时,它不起作用。如果两个 avsc 文件位于不同的目录中,我需要在文件中进行哪些更改以便快速 avro 可以加载架构
python - 使用数据流模板从 BigQuery 读取后,WriteToAvro 未将数据写入文件
一个月以来我一直在努力,但我无法使用 WriteToAvro 将数据写入 GCS Bucket。
我尝试了以下事情:
- 使用 avro.schema.Parse 读取 Schema -> 给出无法读取 shcema 的错误,JSON 应该是 str、bytes 或 bytearray 而不是 dict
- 当使用“”“schema_given_in_code”“”解决这个问题时。在运行代码时,我收到错误消息,说当 shecma 处于 avro 时正在使用 fastavro
- 当通过说 use_fastavro = False 解决该错误时,在 ImmutableDict {}
- 在定义 AvroFileSink() 后尝试使用 WriteToFiles(path=job_options.outputLocation, sink=sink) ,但再次只生成文件并没有根据Beam 流管道不将文件写入存储桶写入数据
- 尝试将读取的数据转换为 JSON,在 fastavro._write.Writer.write 文件“fastavro/_write.pyx”,第 581 行,在 fastavro._write.write_data 中得到类似文件“fastavro/_write.pyx”,第 335 行的错误文件“fastavro/_write.pyx”,第 276 行,在 fastavro._write.write_record AttributeError: 'str' object has no attribute 'get'
一切都在 Jupyter Lab 环境中运行。但是一旦我创建模板,它就会失败。不知道为什么。请有人帮我解决这个问题
编辑: 根据要求,从 DirectRunner 运行的 Jupyter 实验室代码:
json - JSON 的 AVRO 模式
我有一个像这样生成的 JSON。我想知道为此的 avro 模式是什么。数组列表中键值的数量不固定。有相关的帖子,但它们引用了键并且不会更改。在我的情况下,钥匙改变了。变量键的名称不断变化。
avro - confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161 : ValueError
我是 python 的新手,并试图使用 'confluent_kafka' 来生成 avro 消息。使用 'confluent_kafka.schema_registry.avro.AvroSerializer' 相同(参考:https ://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_producer.py )
它适用于带有 dict(json 转换为 dict) 输入的简单 avro 模式,但对于以下示例模式,我收到错误:
架构:
输入 JSON :
错误 :
ValueError: {'CoreOLTPEvents.dbo.Event.Value': {'EventId': 1111111111, 'CameraId': 222222222}} (type <class 'dict'>) do not match ['null', {'connect.name': 'CoreOLTPEvents.dbo.Event.Value', 'type': 'record', 'name': 'CoreOLTPEvents.dbo.Event.Value', 'fields': [{'name': 'EventId', 'type': 'long'}, {'default': None, 'name': 'CameraId', 'type': ['null', 'long']}]}] on field before
'before' 字段类型是联合(['null',record]),如果我将其更改为仅记录(删除联合),那么它可以正常工作。但是我需要调整我的输入,使其适用于给定的模式。
(注意:我正在使用 'json.load(json_file)' 读取 json 输入,因此它提供了 dict 输出)
任何帮助将非常感激。
更新:实际大架构:
大型模式的输入:
错误 :
python - 如何在 avro 序列化期间在 Python 中设置具有十进制值比例和精度的 avro 'bytes' 字段值?
我正在尝试使用我拥有的模式向 kafka 生成一条 avro 消息。(使用 confluent-kafka python 包生产者)
生产者工作正常,除了“字节”字段值,它没有在消费者端正确反序列化。这些“字节”字段的值实际上是十进制值,必须设置比例和精度。
我可以在 Scala 中做到这一点,下面是 Scala 的代码,我正在寻找 Python。
架构(仅特定字段):
斯卡拉实现:
谢谢
python - Confluent Kafka python模式解析器导致与fastavro冲突
我正在使用 Confluent Kafka 1.7.0、avro-python3 1.10.0 和 fastavro 1.4.1 运行 Python 3.9。
以下代码使用 Avro 模式编码器来对消息进行编码,只有当我们通过去掉 来转换生成的模式编码时才会成功MappingProxyType
:
MappingProxyType
除了更改为dict
实例之外,转换基本上保持一切不变。
- 我调用标准库的方式是否存在问题导致使用映射代理,进而导致
fastavro
抛出?这可以由用户修复吗,或者这真的是 Confluent Kafka 库中的错误吗? - 此外,输出
schemaId
fromregistryClient.get_latest_schema()
在文档中标记为 returnstr
但返回int
。schema_id
如果我理解正确,这是对参数的预期输入serializer.encode_record_with_schema_id()
(如果我调用它,它可以正常工作),它也被标记为int
. 这是文档中的错字吗?换句话说,它似乎registryClient.get_latest_schema()
应该返回一个整数,或者serializer.encode_record_with_schema_id()
应该接受一个字符串,或者我做错了什么:) 它是哪一个?
非常感谢。
python - 反序列化,Avro 中的固定数据类型
我是 avro 的新手,我有一个要反序列化的 avro 文件。一些模式使用固定类型的数据来存储 MAC 地址。下面的模式是这些模式之一,并在不同的模式中用作一种类型。
MAC 地址的架构如下:
我使用以下方法将数据的第一条记录写入文本文件:
上述 MAC 地址出现在反序列化数据中,例如:
我知道 \x 表示以下是十六进制值。所以这应该是“36:e9:ad:64:2d:3d”,对吧?“b”“”样式值是固定类型的预期输出吗?
此外,一些值如下所示:
为什么这些是 MAC 地址?j, % 字符是什么意思?
hadoop - 如何将超过 5 GB 的大文件序列化为 avro?
我想将一个大约 15 GB 的 xml 文件序列化为 avro 并使用 python 3.6 存储在 hadoop 中。我的方法是使用 xml.minidom 在字典类型的变量中加载数据,然后将其保存到 avro 文件。虽然这对于几 kb 大小的示例 xml 文件非常有效,但我仍然可以将整个大 xml 数据存储到该变量吗?我想这种方法存在一些记忆挑战?处理这种情况的最佳方法是什么?
avro - avro 架构时间戳格式
我希望以这种格式获取时间戳:MMDDYYYYHHMMSS
对于 avro 模式格式,我可以使用:
还是有更好的方法来做到这一点?