问题标签 [spark-avro]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
768 浏览

apache-spark - Spark AVRO S3 读取不适用于分区数据

当我阅读特定文件时,它可以工作:

但是,如果我指向一个文件夹来读取日期分区数据,它会失败:

val filePath="s3n://bucket_name/f1/f2/avro/dt=2016-10-19/"

我收到此错误:

我错过了什么吗?

0 投票
2 回答
786 浏览

apache-spark - 如何使用 spark 将 avro 写入多个输出目录

你好,有一个题目是关于在一个spark job中使用MultipleTextOutputFormat将文本数据写入多个输出目录

通过键 Spark 写入多个输出 - 一项 Spark 作业

我会问是否有一些类似的方法可以将 avro 数据写入多个目录

我想要的是将avro文件中的数据写入不同的目录(基于时间戳字段,时间戳中的同一天进入同一目录)

0 投票
1 回答
992 浏览

apache-spark - Pyspark + Hive avro 表

我创建了 Hive avro 表,并尝试从 pyspark 中读取它。基本上试图在 pyspark 上对这个 Hive avro 表运行基本查询,以便进行一些分析。

但是,我收到以下错误。“flight”是 avro 模式中的嵌套记录。

谁能帮我解决这个问题?

编辑:这里是 avro 模式:

0 投票
0 回答
115 浏览

apache-spark - 通过 AvroBigQueryInputFormat 从 spark 读取 bq 表给出意外行为(使用 java)

示例骨架代码如下所示,我基本上是从 bigquery 读取 RDD 并选择 my_field_name 值为 null 的所有数据点

然而,输出 RDD 似乎完全出乎意料。特别是 my_field_name 的值似乎完全随机。经过一些调试,似乎过滤是按预期进行的,但问题在于我从GenericData.Record(基本上record.get(my_field_name))提取的值似乎完全随机。

因此,在我从 AvroBigQueryInputFormat 切换到 GsonBigQueryInputFormat 以读取 json 中的 bq 之后,这段代码似乎工作正常。

但是,理想情况下,我真的想改用 Avro(它应该比处理 json 快得多),但是它在我的代码中的当前行为完全令人不安。我只是使用 AvroBigQueryInputFormat 错误?

0 投票
0 回答
334 浏览

java - Avro - 代码生成方法与非代码生成方法

我是 Avro 的新手。官方文档表明使用 avro 有两种可能的方法;

  • 使用代码生成 - 这里的类是由 avro 编译器从 avro 模式文件自动生成的。然后在应用程序代码中使用这些类。

  • 无需代码生成 - 模式直接在应用程序中解析。应用程序不使用自动生成的类(如第一种方法),GenericRecord而是创建对象。

问题

  • 每种方法的优缺点是什么(例如:在性能、编写应用程序代码的难易程度等方面?

提前致谢。

0 投票
2 回答
3632 浏览

apache-spark - 在运行 Spark Streaming 应用程序时处理架构更改

我希望在 Spark 1.6 上使用 DataFrames API 构建一个 Spark Streaming 应用程序。在我深入兔子洞之前,我希望有人可以帮助我了解 DataFrames 如何处理具有不同模式的数据。

这个想法是消息将通过 Avro 模式流入 Kafka。我们应该能够以向后兼容的方式发展模式,而无需重新启动流应用程序(应用程序逻辑仍然有效)。

使用模式注册表和嵌入在消息中的模式 id 使用 KafkaUtils 创建直接流和 AvroKafkaDecoder(来自 Confluent)反序列化新版本的消息似乎很简单。这让我有一个 DStream。

问题 #1:在该 DStream 中会有不同版本的模式的对象。因此,当我将每个对象转换为 Row 对象时,我应该传入一个读取器模式,该模式是正确迁移数据的最新模式,并且我需要将最新模式传递给 sqlContext.createDataFrame(rowRdd, schema) 调用。DStream 中的对象是 GenericData.Record 类型,据我所知,没有简单的方法可以判断哪个是最新版本。我看到了 2 种可能的解决方案,一种是调用模式注册表以在每个微批次上获取最新版本的模式。另一种是修改解码器以附加模式ID。然后我可以遍历 rdd 以找到最高 id 并从本地缓存中获取模式。

我希望有人已经以可重用的方式很好地解决了这个问题。

问题/问题 #2:Spark 将为每个分区从 Kafka 拉取不同的执行程序。当一个执行者收到与其他执行者不同的“最新”模式时,我的应用程序会发生什么。由一个执行程序创建的 DataFrame 在同一时间窗口中将具有与另一个执行程序不同的架构。我实际上不知道这是否是一个真正的问题。我无法可视化数据流,以及什么样的操作会出现问题。如果这是一个问题,则意味着执行者之间需要共享一些数据,这听起来既复杂又低效。

我需要担心这个吗?如果我这样做,如何解决架构差异?

谢谢,--本

0 投票
0 回答
675 浏览

spark-streaming - 无法将 byte[] 转换为 scala 中的字符串

**我正在尝试从 kafka 流式传输数据并将其转换为数据帧。跟着这个链接

但是当我同时运行生产者和消费者应用程序时,这是我控制台上的输出。**

(0,[B@370ed56a) (1,[B@2edd3e63) (2,[B@3ba2944d) (3,[B@2eb669d1) (4,[B@49dd304c) (5,[B@4f6af565) (6 ,[B@7714e29e)

这实际上是 kafka 生产者的输出,在推送消息之前主题是空的。

这是生产者代码片段:

它的输出是:

key=0, value=[B@680387a key=1, value=[B@32bfb588 key=2, value=[B@2ac2e1b1 key=3, value=[B@606f4165 key=4, value=[B@282e7f59

这是我用 scala 编写的消费者代码片段,

我已经在 createStream() 中尝试过 StringDecoder 和 DefaultDecoder。我确信,生产者和消费者是相互遵守的。任何帮助,来自任何人?

0 投票
1 回答
5265 浏览

hadoop - 如何将镶木地板文件转换为 Avro 文件?

我是 hadoop 和大数据技术的新手。我喜欢将 parquet 文件转换为 avro 文件并读取该数据。我在几个论坛中搜索,它建议使用 AvroParquetReader。

但我不确定如何包含 AvroParquetReader。我根本无法导入它。

我可以使用 spark-shell 读取此文件,并可能将其转换为一些 JSON,然后可以将 JSON 转换为 avro。但我正在寻找一个更简单的解决方案。

0 投票
0 回答
478 浏览

scala - 如何读取大型 avro 文件

我正在尝试使用 spark-shell 读取一个大的 avro 文件(2GB),但我收到了 stackoverflow 错误。

我试图增加驱动程序内存和执行程序内存,但我仍然遇到同样的错误。

我怎样才能阅读这个文件?有没有办法对这个文件进行分区?

0 投票
2 回答
1548 浏览

apache-spark - 在 Spark 中将数据转换为 Parquet

我在 S3 中有一些遗留数据,我想使用 Spark 2 使用 Java API 将其转换为镶木地板格式。

我有所需的 Avro 模式(.avsc 文件)和它们使用 Avro 编译器生成的 Java 类,我想使用 Parquet 格式的这些模式存储数据。输入数据不是任何标准格式,但我有一个库,可以将旧文件中的每一行转换为 Avro 类。

是否可以将数据读取为JavaRDD<String>,使用库将转换应用于 Avro 类,最后以镶木地板格式存储。

就像是:

像上面这样的事情可行吗?稍后我想使用 Hive、Presto 和 Spark 处理转换后的 parquet 数据。