4

我有一组基于 Avro 的配置单元表,我需要从中读取数据。由于 Spark-SQL 使用 hive serdes 从 HDFS 读取数据,它比直接读取 HDFS 慢得多。所以我使用数据块 Spark-Avro jar 从底层 HDFS 目录读取 Avro 文件。

一切正常,除非桌子是空的。我已设法使用以下命令从 hive 表的 .avsc 文件中获取架构,但出现错误“未找到 Avro 文件

val schemaFile = FileSystem.get(sc.hadoopConfiguration).open(new Path("hdfs://myfile.avsc"));

val schema = new Schema.Parser().parse(schemaFile);

spark.read.format("com.databricks.spark.avro").option("avroSchema", schema.toString).load("/tmp/myoutput.avro").show()

解决方法:

我在该目录中放置了一个空文件,并且同样的工作正常。

有没有其他方法可以达到同样的效果?像conf设置什么的?

4

4 回答 4

6

你不需要使用emptyRDD。以下是 PySpark 2.4 对我有用的:

empty_df = spark.createDataFrame([], schema) # spark is the Spark Session

如果您已经有来自另一个数据框的架构,则可以这样做:

schema = some_other_df.schema

如果不这样做,则手动创建空数据框的架构,例如:

schema = StructType([StructField("col_1", StringType(), True),
                     StructField("col_2", DateType(), True),
                     StructField("col_3", StringType(), True),
                     StructField("col_4", IntegerType(), False)]
                     )

我希望这有帮助。

于 2019-05-08T20:08:21.613 回答
5

类似于 EmiCareOfCell44 的回答,只是更优雅一点,更“空”

val emptySchema = StructType(Seq())
val emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],
                emptySchema)
于 2018-11-11T17:10:47.897 回答
4

创建一个空的 DataFrame:

val my_schema = StructType(Seq(
    StructField("field1", StringType, nullable = false),
    StructField("field2", StringType, nullable = false)
  ))

val empty: DataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], my_schema)

也许这可能会有所帮助

于 2018-05-30T15:06:38.183 回答
1

根据您的 Spark 版本,您可以使用反射方式.. SchemaConverters 中有一个私有方法可以将 Schema 转换为 StructType.. (不知道为什么它是私有的,说实话,它真的很有用在其他情况下)。使用 scala 反射,您应该能够通过以下方式进行操作

import scala.reflect.runtime.{universe => ru}
import org.apache.avro.Schema
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

var schemaStr = "{\n \"type\": \"record\",\n \"namespace\": \"com.example\",\n \"name\": \"FullName\",\n \"fields\": [\n { \"name\": \"first\", \"type\": \"string\" },\n      { \"name\": \"last\", \"type\": \"string\" }\n  ]\n }"
val schema = new Schema.Parser().parse(schemaStr);

val m = ru.runtimeMirror(getClass.getClassLoader)
val module = m.staticModule("com.databricks.spark.avro.SchemaConverters")
val im = m.reflectModule(module)
val method = im.symbol.info.decl(ru.TermName("toSqlType")).asMethod

val objMirror = m.reflect(im.instance)
val structure = objMirror.reflectMethod(method)(schema).asInstanceOf[com.databricks.spark.avro.SchemaConverters.SchemaType]
val sqlSchema = structure.dataType.asInstanceOf[StructType]
val empty = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], sqlSchema)

empty.printSchema
于 2018-05-31T15:44:29.693 回答