0

我有 的列表org.apache.avro.generic.GenericRecordavro schema使用我们需要在APIdataframe的帮助下创建它,来创建它需要和。创建 DF 的先决条件是我们应该有 org.apache.spark.sql.Row 的 RDD,它可以使用下面的代码来实现,但有些它不工作并给出错误,示例代码。SQLContextdataframeRDDorg.apache.spark.sql.Rowavro schema

 1. Convert GenericRecord to Row
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    import org.apache.avro.Schema
    import org.apache.spark.sql.types.StructType
    def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] =
    {
      val fields = avroSchema.getFields
      var rows = new Seq[Row]
      for (avroRecord <- genericRecords) {
        var avroFieldsSeq = Seq[Any]();
        for (i <- 0 to fields.size - 1) {
          avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name)
        }
        val avroFieldArr = avroFieldsSeq.toArray
        val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType)
        rows = rows :+ genericRow
      }
      return rows;
    }

2. Convert `Avro schema` to `Structtype`
   Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType

3. Create `Dataframe` using `SQLContext`
   val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType)
   val rowRdd = sc.parallelize(rowSeq, 1)
   val finalDF =sqlContext.createDataFrame(rowRDD,structType)

但它在创建DataFrame. 有人可以帮我看看上面的代码有什么问题吗?除此之外,如果有人有不同的逻辑来转换和创建dataframe.

每当我在 Dataframe 上调用任何操作时,它都会执行 DAG 并尝试创建 DF 对象,但在此它失败并出现以下异常

 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
 Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
                        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
                        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)

在此之后,我试图在 spark submit 的 jar 参数中提供正确的版本 jar,并将其他参数作为 --conf spark.driver.userClassPathFirst=true 但现在它与 MapR 一样失败

ERROR CLDBRpcCommonUtils: Exception during init
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)
                    at com.mapr.security.JNISecurity.SetClusterOption(Native Method)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63)
                    at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69)
                    at java.lang.Class.forName0(Native Method)

我们正在使用 MapR 分发,并且在 spark-submit 中更改类路径后,它因上述异常而失败。

有人可以在这里提供帮助,或者我的基本需要将 Avro GenericRecord 转换为 Spark Row,以便我可以用它创建 Dataframe,请帮助
谢谢。

4

3 回答 3

4

也许这有助于稍后进入游戏的人。

由于spark-avro已弃用并且现在集成在 Spark 中,因此可以通过不同的方式来实现。

import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.encoders.RowEncoder

...

val avroSchema = data.head.getSchema
val sparkTypes = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val converter = new AvroDeserializer(avroSchema, sparkTypes)
val enconder = RowEncoder.apply(sparkTypes).resolveAndBind()

val rows = data.map { record =>
    enconder.fromRow(converter.deserialize(record).asInstanceOf[InternalRow])
}

val df = sparkSession.sqlContext.createDataFrame(sparkSession.sparkContext.parallelize(rows), sparkTypes)
于 2020-01-31T08:17:19.357 回答
1

从 RDD[GenericRecord] 创建数据帧时,有几个步骤

  1. 首先需要将 org.apache.avro.generic.GenericRecord 转换为 org.apache.spark.sql.Row

使用 com.databricks.spark.avro.SchemaConverters.createConverterToSQL(sourceAvroSchema: Schema,targetSqlType: DataType)

这是 spark-avro 3.2 版本中的私有方法。如果我们的版本相同或小于 3.2,则将此方法复制到您自己的 util 类中并使用它,否则直接使用它。

  1. 从行(rowSeq)的集合创建数据框。

val rdd = ssc.sparkContext.parallelize(rowSeq,numParition) val dataframe = sparkSession.createDataFrame(rowRDD, schemaType)

这解决了我的问题。

于 2018-03-28T05:25:49.237 回答
0

希望这会有所帮助。在第一部分中,您可以了解如何从 GenericRecord 转换为 Row

如何将 RDD [GenericRecord] 转换为 scala 中的数据框?

于 2017-11-14T13:26:06.660 回答