问题标签 [spark-avro]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
150 浏览

spark-streaming - 如何从 kafka 读取 Avro Schema 类型的事件并将它们存储在 Hive 表中

我的想法是使用Spark Streaming + Kafka从 kafka 总线获取事件。在检索一批 avro 编码的事件后,我想用Spark Avro将它们转换为 SparkSQL 数据帧,然后将数据帧写入 Hive 表。

这种方法可行吗?我是 spark 新手,我不确定是否可以使用 Spark Avro 包来解码 Kafka 事件,因为在文档中只提到了 avro 文件。但到目前为止,我的理解是,这是可能的。

下一个问题是:如果可能的话,我的理解是,我有一个符合 SparkSQL 的 Dataframe,我可以将其写入配置单元表。我的假设正确吗?

提前感谢任何提示和提示。

0 投票
0 回答
390 浏览

scala - 使用 Java api 使用 Spark 从 parquet 读取/访问原始双数组

我有一个使用该parquet-avro库生成的 Parquet 文件,其中一个字段具有原始双精度数组,使用以下模式类型创建:

我从 Spark 中读取了这个 parquet 数据,并在其上应用了 UDAF(用户定义的聚合函数)。在 UDAForg.apache.spark.sql.expressions.UserDefinedAggregateFunction中,我试图从org.apache.spark.sql.Row对象访问该字段,该对象作为参数传递给函数public void update(MutableAggregationBuffer mutableAggBuff, Row dataRow)。但是,我无法访问原始双精度数组,而我得到的是一个数组,Double[]该数组是原始双精度的装箱对象表示。这是原始双精度数组数据的非常昂贵的对象转换。

当我检索双精度数组时,我得到的是盒装java.lang.Double数组,而不是原始双精度数组。在 parquet 阅读器代码的某处,原始数组正在转换为内存效率低的 Double 对象数组。如何防止这种昂贵的转换,并使原始双数组完好无损?我可以编写代码并将其转换回原始数组,但是已经创建了 Double 对象并且它给 VM 施加了 GC 压力。

唯一的 APIorg.apache.spark.sql.Row是:

我们需要一种double[]无需任何进一步转换即可获得原始数组的方法。例如:

问题:

  1. 我可以自定义 Hadoop-parquet 阅读器,以便我们可以将双数组读取为原始双数组吗?
  2. Spark/Parquet-Hadoop Reader 是否有任何方法可以在没有自定义代码的情况下做到这一点?
0 投票
0 回答
98 浏览

apache-spark-sql - Spark 1.6 - 使用数据帧失败的 avro 文件覆盖目录

我在 HDFS 中有一个目录,其中包含 avro 文件。当我尝试用数据框覆盖目录时,它失败了。语法:avroData_df.write.mode(SaveMode.Overwrite).format("com.databricks.spark.avro").save("") 错误是: Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path不存在:hdfs://nameservice1//part-r-00000-bca9a5b6-5e12-45c1-a877-b0f6d6cc8cd3.avro

在覆盖时,它似乎也在考虑 avro 文件。

我们可以使用 Spark 1.6 来做到这一点吗?

0 投票
1 回答
53 浏览

apache-spark-sql - 将数据从 HDFS/S3 移植到本地 FS 并在 Java 中加载

我在 EMr 集群上运行了一个 Spark 作业,该作业将 DataFrame 写入 HDFS(然后将其s3-dist-cp-ed 写入S3)。数据量不大(另存为 2 GB parquet)。然后将 S3 中的这些数据复制到本地文件系统(运行 Linux 的 EC2 实例),然后加载到 Java 应用程序中。

事实证明,我无法获取parquet格式数据,因为 parquet 是为 HDFS 设计的,不能在本地 FS 中使用(如果我错了,请指出有关如何在本地 FS 上读取 parquet 文件的资源)。

我可以使用什么其他格式来解决这个问题?Avro 是否足够紧凑,并且不会通过将模式与数据帧的每一行打包来破坏数据的大小?

0 投票
2 回答
1729 浏览

apache - Avro 消息中的模式

我看到 Avro 消息嵌入了架构,然后是二进制格式的数据。如果发送了多条消息并且为每条消息创建了新的 avro 文件,那么 Schema 不是嵌入开销吗?那么,这是否意味着,生产者对消息进行批处理然后写入总是很重要的,所以将多条消息写入一个 avro 文件,只携带一个模式?另一方面,是否有一个选项可以在使用 Generic/SpecificDatum 编写器进行序列化时消除模式嵌入?

0 投票
1 回答
260 浏览

apache-kafka - SCHEMA REGISTRY KAFKA:我如何将它集成到 java 项目中

在经历了几次关于模式注册的讲座并研究了它是如何工作的之后,我比以前更困惑了。

我想了解如何在我的 kafka 项目中包含模式注册表,在该项目中,我们在本地有一些生产者和一些消费者,它们与远程服务器的相应消费者/生产者打交道。

如果我理解正确,生产者将我的 avroFile 的 schemaId(具有此当前架构的版本)发布到架构注册表中,并在 kafka 队列上上传一些主题,其中 schemaID 在有效负载标头中。

之后,消费者将从队列中读取具有相同 schemaId 的主题(通过调用 API?)并且该主题将被消费。

我理解对了吗?你能给我解释一下吗,也许用图表模式?

非常感谢。斯特凡诺

0 投票
0 回答
618 浏览

scala - Spark avro 谓词下推

我们使用的是 Avro 数据格式,数据按年、月、日、小时、分钟进行分区

我看到存储在 HDFS 中的数据为

我们使用加载数据

然后使用谓词下推过滤数据 -

有人可以解释幕后发生的事情吗?我想具体了解输入文件的过滤何时发生以及在哪里发生?有趣的是,当我打印模式时,会自动添加字段年、月、日和小时,即实际数据不包含这些列。Avro 是否添加了这些字段?想清楚地了解如何过滤文件以及如何创建分区。

0 投票
0 回答
100 浏览

spark-streaming - 火花流中的 JsonDecoder 解析失败

我正在尝试解码作为我的 spark2.2 流中的 avro 消息的一部分出现的消息。我为此 json 定义了一个模式,并且每当 json 消息出现不尊重 json 模式时,我的 JsonDecoder 就会失败并出现以下错误

我知道杰克逊解码有一种方法可以忽略额外和缺失的字段。org.apache.avro.io.JsonDecoder 中是否有相同行为的方法?

0 投票
1 回答
108 浏览

java - 从原始 avro 模式创建对象

假设我在 avro 中有这样的模式

我应该如何在 java 中从这个模式创建对象?

0 投票
1 回答
876 浏览

apache-spark - 如何使用 Java 中的分区将 Avro 对象写入 Parquet?如何将数据附加到同一个镶木地板?

我正在使用 Confluent 的 KafkaAvroDerserializer 反序列化通过 Kafka 发送的 Avro 对象。我想将收到的数据写入 Parquet 文件。我希望能够将数据附加到同一个实木复合地板并创建一个带有分区的实木复合地板。

我设法用 AvroParquetWriter 创建了一个 Parquet - 但我没有找到如何添加分区或附加到同一个文件:

在使用 Avro 之前,我使用 spark 编写 Parquet - 使用 spark 编写带分区的 parquet 并使用附加模式很简单 - 我应该尝试从我的 Avro 对象创建 Rdds 并使用 spark 来创建 parquet 吗?