3

我正在使用 Spark sql DataSet 将数据写入配置单元。如果架构相同,它可以正常工作,但是如果我更改 avro 架构,在两者之间添加新列,它会显示错误(架构是从架构注册表提供的)

Error running job streaming job 1519289340000 ms.0 org.apache.spark.sql.AnalysisException: The column number of the existing table default.sample(struct<collection_timestamp:bigint,managed_object_id:string,managed_object_type:string,if_admin_status:string,date:string,hour:int,quarter:bigint>) doesn't match the data schema(struct<collection_timestamp:bigint,managed_object_id:string,if_oper_status:string,managed_object_type:string,if_admin_status:string,date:string,hour:int,quarter:bigint>);

if_oper_status是必须添加新列。请建议。

StructType struct = convertSchemaToStructType(SchemaRegstryClient.getLatestSchema("simple"));
        Dataset<Row> dataset = getSparkInstance().createDataFrame(newRDD, struct);


        dataset=dataset.withColumn("date",functions.date_format(functions.current_date(), "dd-MM-yyyy"));
        dataset=dataset.withColumn("hour",functions.hour(functions.current_timestamp()));
        dataset=dataset.withColumn("quarter",functions.floor(functions.minute(functions.current_timestamp()).divide(5)));


        dataset
        .coalesce(1)
        .write().mode(SaveMode.Append)
        .option("charset", "UTF8")
        .partitionBy("date","hour","quarter")
        .option("checkpointLocation", "/tmp/checkpoint")
        .saveAsTable("sample");
4

2 回答 2

1

我可以通过将注册表中的模式保存到文件中并提供 avro.schema.url = 文件路径来解决这个问题,如下所示。

注意:这必须在之前完成saveAsTable("sample")

dataset.sqlContext().sql("CREATE EXTERNAL TABLE IF NOT EXISTS sample PARTITIONED BY (dt STRING, hour STRING, quarter STRING ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED as INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION 'hdfs://localhost:9000/user/root/sample'  TBLPROPERTIES ('avro.schema.url'='file://"+file.getAbsolutePath()+"')");
于 2018-03-01T06:12:42.207 回答
0

请参考链接:https ://github.com/databricks/spark-avro/pull/155 。根据提交历史,支持不断发展的 Avro 模式的 PR 已添加到 3.1 版本中。您在代码中使用的是什么版本的 spark-avro?

于 2018-02-22T10:13:47.847 回答