问题标签 [spark-avro]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
207 浏览

apache-spark-sql - 加载 Avro 文件以创建数据框时出现 StackOverflowError

我在尝试加载 Avro 文件(大小 134 KB)时遇到此错误。我的 pom 依赖项如下。我正在从一个运行良好的 protobuf 消息创建这个 Avro。

pom依赖:

例外 :

0 投票
0 回答
1287 浏览

java - 如何将 GenericRecord 转换为与 Avro 中给出的模式相对应的 json 字符串

我有一个要求,我需要在 AWS S3 中以 json 格式存储数据,我们目前正在达到一个提供 List [GenericRecord] 的 enpoint,并且需要以 Json 格式存储,任何人都可以分享一个示例代码来实现这个。我无法将 GenericRecord 反序列化为 Json 字符串,甚至 ObjectMapper writeValueAsString 方法也无法处理它

0 投票
1 回答
446 浏览

avro - 使用 AVRO 对字段进行数据转换

我是 AVRO 的新手。我们已经开始使用 AVRO 模式来读取数据。

现在我们有一个用例,我需要在读取时截断数据。

假设我的 avro schcema 是这样的

现在数据是这样的。

当我读取数据时,我想截断 ProductID 字段。在上面的示例中,当我读取 ProductID 是 ABC1234567 时,我想将其截断为 5 个字符 ABC12

我可以在模式中指定任何东西来截断它吗?

0 投票
1 回答
463 浏览

python - scala 和 Python 之间的 Avro Kafka 转换问题

我们的项目有 scala 和 python 代码,我们需要向 kafka 发送/使用 avro 编码的消息。

我正在使用 python 和 scala 向 kafka 发送 avro 编码消息。我在 scala 代码中有生产者,它使用 Twitter 双射库发送 avro 编码消息,如下所示:

Avro 架构看起来像

}

我能够在 Scala 的 KafkaConsumer 中成功解码它

但是,我无法在 python 中解码消息我得到以下异常

python 代码如下所示: schema_path="avro/url_info_schema.avsc" schema = avro.schema.parse(open(schema_path).read())

scala avro 消费者也不理解 python avro 生产者消息。我有一个例外。Python Avro 生产者如下所示:

如何在 python 和 scala 中保持一致?任何指针都会很棒

0 投票
1 回答
1385 浏览

compression - 通过 PySpark 在 Avro 上启用压缩

使用 PySpark 我正在尝试使用压缩保存 Avro 文件(最好是 snappy)。

这行代码成功保存了一个 264MB 的文件:

df.write.mode('overwrite').format('com.databricks.spark.avro').save('s3n://%s:%s@%s/%s' % (access_key, secret_key, aws_bucket_name, output_file))

当我添加编解码器选项.option('codec', 'snappy')时,代码成功运行,但文件大小仍为 264MB:

df.write.mode('overwrite').option('codec', 'snappy').format('com.databricks.spark.avro').save('s3n://%s:%s@%s/%s' % (access_key, secret_key, aws_bucket_name, output_file))

我也尝试过'SNAPPY''Snappy'它也成功运行,但文件大小相同。

我已经阅读了文档,但它侧重于 Java 和 Scala。这在 PySpark 中不支持吗,Snappy 是默认的并且没有记录,还是我没有使用正确的语法?还有一个相关的问题(我假设),但它专注于 Hive 并且没有答案。

TIA

0 投票
1 回答
4185 浏览

scala - 如何将 Avro 中的字节列(逻辑类型为十进制)转换为十进制?

我有一个十进制列“TOT_AMT”,在我的 avro 模式中定义为类型“字节”和逻辑类型“十进制”。

在使用 databricks spark-avro 在 spark 中创建数据框后,当我尝试使用 sum 函数对TOT_AMT列求和时,它会抛出"Function sum requires numeric types not Binarytype" 错误

该列在 avro 模式中定义如下,

name="TOT_AMT","type":["null",{ "type":"bytes","logicaltype":"decimal","precision":20,"scale":10}]

我正在创建数据框并总结如下,

在创建数据帧时,十进制值似乎被读取为 Binarytype。在这种情况下,我们如何对这些十进制列执行数字运算?是否可以将这个 Byte 数组转换为 BigDecimal,然后进行计算。

0 投票
1 回答
204 浏览

mysql - Avro tojson 日期格式

我使用 sqoop 将带有选定列的表导入到 avro 文件格式。使用 avro-tools tojson 日期以奇怪的格式出现(负数)。我怎样才能解码日期?

MySQL 查询的正确格式在哪里

0 投票
1 回答
880 浏览

scala - IncompatibleSchemaException:以 Avro 格式序列化时出现意外类型 VectorUDT

我正在使用 Spark Mllib 为我的数据生成预测,然后以 Avro 格式将它们存储到 HDFS:

我收到以下异常:

我的理解是“预测”列格式不能序列化为 Avro。

  • 如何将 VectorUDT 转换为数组,以便在 Avro 中对其进行序列化?
  • 有没有更好的选择(我无法摆脱 Avro 格式)?
0 投票
1 回答
413 浏览

java - Spark 1.6 在数据帧保持分区字段中加载特定分区

我们有一个这样分区的 avro:

我们希望从保留分区列 a 的单个分区加载数据。我发现了这个 stackoverflow 问题,并应用了建议的代码段:

但是当我尝试阅读该字段时它会说:

在 Spark Java API 1.6 中是否可行?

0 投票
0 回答
129 浏览

python-3.x - AvroTypeException:在 python3 中编写时

我的avsc文件如下:

我能够解析这个,但是当我尝试如下写这个时,我一直遇到问题。我错过了什么?这是在python3中。我验证它也是格式正确的 json。

我在这里想念什么?