我有 的列表org.apache.avro.generic.GenericRecord
,avro schema
使用我们需要在APIdataframe
的帮助下创建它,来创建它需要和。创建 DF 的先决条件是我们应该有 org.apache.spark.sql.Row 的 RDD,它可以使用下面的代码来实现,但有些它不工作并给出错误,示例代码。SQLContext
dataframe
RDD
org.apache.spark.sql.Row
avro schema
1. Convert GenericRecord to Row
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType
def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] =
{
val fields = avroSchema.getFields
var rows = new Seq[Row]
for (avroRecord <- genericRecords) {
var avroFieldsSeq = Seq[Any]();
for (i <- 0 to fields.size - 1) {
avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name)
}
val avroFieldArr = avroFieldsSeq.toArray
val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType)
rows = rows :+ genericRow
}
return rows;
}
2. Convert `Avro schema` to `Structtype`
Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType
3. Create `Dataframe` using `SQLContext`
val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType)
val rowRdd = sc.parallelize(rowSeq, 1)
val finalDF =sqlContext.createDataFrame(rowRDD,structType)
但它在创建DataFrame
. 有人可以帮我看看上面的代码有什么问题吗?除此之外,如果有人有不同的逻辑来转换和创建dataframe
.
每当我在 Dataframe 上调用任何操作时,它都会执行 DAG 并尝试创建 DF 对象,但在此它失败并出现以下异常
ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
在此之后,我试图在 spark submit 的 jar 参数中提供正确的版本 jar,并将其他参数作为 --conf spark.driver.userClassPathFirst=true 但现在它与 MapR 一样失败
ERROR CLDBRpcCommonUtils: Exception during init
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)
at com.mapr.security.JNISecurity.SetClusterOption(Native Method)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63)
at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69)
at java.lang.Class.forName0(Native Method)
我们正在使用 MapR 分发,并且在 spark-submit 中更改类路径后,它因上述异常而失败。
有人可以在这里提供帮助,或者我的基本需要将 Avro GenericRecord 转换为 Spark Row,以便我可以用它创建 Dataframe,请帮助
谢谢。