1

嗨,我正在阅读 spark 上下文中的 parquet 文件,然后合并各种 parquet 文件的模式。然后我保存从 read parquet 函数返回的数据帧并使用 write.parquet 命令保存。

Here is the code: 

    options:Map[String,String] = Map[String,String]()
    val options2 = options.+("mergeSchema" -> "true")
    df = sqc.read.options(options2).schema(schemaObj).parquet(paths:_*)
    df.printSchema()
    df.coalesce(1).write.parquet("/home/abc/")

这里 schemaObj 是 Parquet[StructType] 类型,它是从 AvroSchema 转换而来的

My sbt is as following: 

    "org.apache.spark"     % "spark-core_2.10"          % "1.6.0" %provided",
    "org.apache.spark"     % "spark-sql_2.10"                % "1.6.0",
    "com.amazonaws"        % "aws-java-sdk"                  % "1.9.27",
    "com.databricks"       % "spark-avro_2.10"               % "2.0.1",
    "org.apache.avro"      % "avro"                          % "1.7.6",
    "io.confluent"         % "kafka-avro-serializer"         % "1.0",
    "mysql"                % "mysql-connector-java"          % "5.1.6",
    "io.confluent"         %"kafka-schema-registry-client"   % "2.0.1",
    "com.twitter"            %"parquet-avro"                 % "1.6.0"



I am getting following error: 

    Could not find creator property with name 'id' (in class org.apache.spark.rdd.RDDOperationScope)
     at [Source: {"id":"0","name":"ExecutedCommand"}; line: 1, column: 1]
    at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
        at com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)
        at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409)
        at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358)
        at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265)
        at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245)
        at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)
        at com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:439)
        at com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:3668)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3560)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2580)
        at org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:85)
        at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1616)
        at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1616)
        at scala.Option.map(Option.scala:145)
        at org.apache.spark.rdd.RDD.<init>(RDD.scala:1616)
        at org.apache.spark.rdd.SqlNewHadoopRDD.<init>(SqlNewHadoopRDD.scala:64)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1.<init>(ParquetRelation.scala:327)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1.apply(ParquetRelation.scala:327)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1.apply(ParquetRelation.scala:327)
        at org.apache.spark.util.Utils$.withDummyCallSite(Utils.scala:2165)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.buildInternalScan(ParquetRelation.scala:326)
        at org.apache.spark.sql.sources.HadoopFsRelation.buildInternalScan(interfaces.scala:661)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:113)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:113)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:274)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:273)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:352)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:269)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:109)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
        at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
        at org.apache.spark.sql.DataFrame.saveAsParquetFile(DataFrame.scala:1837)
4

1 回答 1

1

我能够通过添加以下依赖项来解决问题

依赖覆盖 ++= 设置(“com.fasterxml.jackson.core”%“jackson-databind”%“2.4.4”)

不确定,但也许 parquet-mr 使用的是较旧的 jackson bind 版本。该库用于将 AvroSchema 转换为 ParquetSchema

于 2016-03-16T11:03:46.153 回答