问题标签 [spark-avro]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
474 浏览

spark-dataframe - Spark CodeGenerator 编译失败,得到 NPE,很少发生

我正在做简单的 spark 聚合操作,从 avro 文件中读取数据作为数据帧,然后使用 rdd.map 方法将它们映射到案例类,然后进行一些聚合操作,比如 count 等。大多数时候它工作得很好。但有时它会产生奇怪的 CodeGen 异常;

我正在使用此代码;

我无法重现问题。但我在生产中不定期地得到它。我正在使用 java-app 并获取 spark-core_2.11:2.1.0 和 spark-avro_2.11:3.1.0 maven 坐标。

问题可能出在哪里,我在运行应用程序时正在设置 java -Xms8G -Xmx12G -XX:PermSize=1G -XX:MaxPermSize=1G 。

0 投票
2 回答
1044 浏览

apache-spark - NoSuchMethodError 使用 Databricks Spark-Avro 3.2.0

我有一个 spark master & worker 在 Docker 容器中运行,带有 spark 2.0.2 和 hadoop 2.7。我正在尝试通过运行从不同的容器(相同的网络)提交来自 pyspark 的作业

但我收到了这个错误:

如果我尝试以交互方式或使用 spark-submit 没有区别。这些是我在 spark 中加载的包:

spark-submit --version输出:

斯卡拉版本是 2.11.8

我的 pyspark 命令:

我的火花提交命令:

我在这里读到这可能是由“正在使用旧版本的 avro”引起的,所以我尝试使用 1.8.1,但我一直收到同样的错误。阅读 avro 工作正常。有什么帮助吗?

0 投票
1 回答
530 浏览

scala - 如何将 databricks avro jar 添加到 hdinsight

我目前正在尝试使用外部库 spark-avro 在我们的 HDInsight 群集上运行 Spark Scala 作业,但没有成功。有人可以帮我解决这个问题吗?目标是找到能够读取驻留在 HDInsight 群集上的 Azure blob 存储上的 avro 文件的必要步骤。

当前规格:

  • Linux 上的 Spark 2.0 (HDI 3.5) 集群类型
  • 斯卡拉 2.11.8
  • spark-assembly-2.0.0-hadoop2.7.0-SNAPSHOT.jar
  • 火花-avro_2.11:3.2.0

使用的教程:https ://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-apache-spark-intellij-tool-plugin

火花斯卡拉代码:

基于以下示例:https ://github.com/databricks/spark-avro

收到错误:

0 投票
1 回答
6324 浏览

json - 如何在 Spark 中将复杂的 json 或复杂对象保存为 Parquet?

我是 Spark 的新手,我试图弄清楚是否有办法将复杂的对象(嵌套)或复杂的 json 保存为 Spark 中的 Parquet。我知道 Kite SDK,但我知道它使用 Map/Reduce。

我环顾四周,但找不到解决方案。

谢谢你的帮助。

0 投票
1 回答
3151 浏览

java - 如何使用 Spark 从 csv 文件写入 avro 文件?

当我尝试从 csv 文件创建的 DF 写入 avro 文件时,我遇到了 NullPointerException:

我的 pom.xml :

和异常堆栈跟踪:

我不知道我做错了什么?也许依赖关系不正确?或者这只是我做的一个坏习惯?

npe在这里:DataFrameWriter<Row> format = write.format("com.databricks.spark.avro"); format.save("C:\\git\\sparkCsvToAvro\\src\\main\\resources\\avro");

“格式”为空,我不知道为什么?

0 投票
0 回答
1376 浏览

apache-spark-sql - 在 Spark 2.1 中使用结构化流从 Kafka 读取 Avro 消息

我在此线程上关注@Ralph Gonzalez 的消息,使用 Spark 2.1 中的结构化流从 Kafka 读取 Avro 消息,但出现以下错误。

我在这里遇到了@Michael G. Noll 的帖子,它建议使用 DataFileReader 而不是 binaryDecoder,如下所示。

我尝试在 Scala 中使用它,但没有成功。下面是代码的当前状态。

def main(args: Array[String]) {

我的架构和案例类如下所示

我实际上已经在这上面花了两天时间,所以任何帮助都将受到高度赞赏。谢谢。

0 投票
2 回答
746 浏览

hive - 用于存储海量数据和实时更新的非 HBase 解决方案

嗨,我已经开发了一个应用程序,我必须第一次存储 TB 数据,然后以 xml 的形式每月存储 20 GB 增量,例如插入/更新/删除,这将应用于这 5 TB 数据之上。最后,根据请求,我必须生成所有数据的完整快照并根据逻辑创建 5K 文本文件,以便相应的数据应位于相应的文件中。

我已经使用 HBase 完成了这个项目。我在 HBase 中创建了 35 个表,区域从 10 到 500 。我在我的 HDFS 中有我的数据,并使用 mapreduce 将数据批量加载到接受 Hbase 表中。

之后,我用 Java 编写了 SAX 解析器应用程序来解析所有传入的 xml 增量文件并更新 HBase 表。xml 文件的频率约为每分钟 10 个 xml 文件,总共更新 2000 次。增量消息严格按顺序排列。

最后根据请求,我运行我的最后一个 mapreduce 应用程序来扫描所有 Hbase 表并创建 5K 文本文件并将其交付给客户端。

所有 3 个步骤都运行良好,但是当我在共享集群的生产服务器上部署我的应用程序时,基础架构团队不允许我们运行我的应用程序,因为我在 HBase 上进行全表扫描。

我使用了 94 个节点的集群,我拥有的最大的 HBase 表数据约为 20 亿。所有其他表的数据都少于一百万。

mapreduce 扫描和创建文本文件的总时间需要 2 小时。

现在我正在寻找其他一些解决方案来实现这一点。

我可以使用 HIVE,因为我也有记录级别的插入/更新,并以非常精确的方式删除它。

我还集成了 HBase 和 HIVE 表,因此对于增量数据,将使用 HBase 表,而对于全表扫描,将使用 HIVE。但是由于 HIVE 使用 Hbase 存储处理程序,我无法在 HIVE 表中创建分区,这就是为什么 HIVE 全表扫描变得非常非常慢甚至比 HBase 全表扫描慢 10 倍的原因

我现在想不出任何解决方案。请帮助我解决不涉及 HBase 的其他解决方案。

我可以在这个用例中使用 AVRO 或 perquet 文件吗?但我不确定 AVRO 将如何支持记录级别更新。

0 投票
1 回答
6427 浏览

apache-spark - 如何将字节从 Kafka 转换为其原始对象?

我从 Kafka 获取数据,然后反序列化Array[Byte]使用默认解码器,之后我的 RDD 元素看起来像(null,[B@406fa9b2)(null,[B@21a9fe0)但我想要具有模式的原始数据,那么我该如何实现呢?

我以 Avro 格式序列化消息。

0 投票
2 回答
1024 浏览

apache-spark - 在 Spark 中,如何将多个数据帧转换为 avro?

我有一个 Spark 作业,将一些数据处理成几个单独的数据帧。我将这些数据帧存储在一个列表中,即数据帧[]。最终,我想将这些数据帧组合成分层格式,并将输出写入 avro。avro 架构是这样的:

可以推断,每个数据帧都有三个字段,field1、field2 和 field3,我想将它们作为数组写入 avro 文件中。还有一些与每个数据帧相关的元数据。

我目前的做法是,一旦处理完这些数据,将数据帧写入 S3,然后使用单独的程序从 S3 中提取这些数据,使用 avro 库编写 avro 文件,然后再次将其上传到 S3。

但是,随着数据量的增长,这变得非常缓慢。我已经查看了 databricks 库以直接编写 avro 文件,但我不知道如何在内存中将数据帧组合在一起,或者 databricks 库如何确定我正在使用的架构。

在 Spark 中是否有惯用的方法来做到这一点?

PS 我在 Python 中使用 EMR 和 Spark 2.0.0。

0 投票
3 回答
5341 浏览

apache-spark - 将 org.apache.avro.generic.GenericRecord 转换为 org.apache.spark.sql.Row

我有 的列表org.apache.avro.generic.GenericRecordavro schema使用我们需要在APIdataframe的帮助下创建它,来创建它需要和。创建 DF 的先决条件是我们应该有 org.apache.spark.sql.Row 的 RDD,它可以使用下面的代码来实现,但有些它不工作并给出错误,示例代码。SQLContextdataframeRDDorg.apache.spark.sql.Rowavro schema

但它在创建DataFrame. 有人可以帮我看看上面的代码有什么问题吗?除此之外,如果有人有不同的逻辑来转换和创建dataframe.

每当我在 Dataframe 上调用任何操作时,它都会执行 DAG 并尝试创建 DF 对象,但在此它失败并出现以下异常

在此之后,我试图在 spark submit 的 jar 参数中提供正确的版本 jar,并将其他参数作为 --conf spark.driver.userClassPathFirst=true 但现在它与 MapR 一样失败

我们正在使用 MapR 分发,并且在 spark-submit 中更改类路径后,它因上述异常而失败。

有人可以在这里提供帮助,或者我的基本需要将 Avro GenericRecord 转换为 Spark Row,以便我可以用它创建 Dataframe,请帮助
谢谢。