我是 apace hudi 的新手,并尝试使用 spark shell 在我的 Hudi 表中写入我的数据框。对于第一次输入,我没有创建任何表并以覆盖模式写入,所以我期待它会创建 hudi 表。我正在编写以下代码。
spark-shell \
--packages org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
//Initialize a Spark Session for Hudi
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.spark.sql.SparkSession
val spark1 = SparkSession.builder().appName("hudi-datalake").master("local[*]").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").config("spark.sql.hive.convertMetastoreParquet", "false").getOrCreat ()
//Write to a Hudi Dataset
val inputDF = Seq(
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
).toDF("id", "creation_date", "last_update_time")
val hudiOptions = Map[String,String](
HoodieWriteConfig.TABLE_NAME -> "work.hudi_test",
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "work.hudi_test",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName)
// Upsert Data
// Create a new DataFrame from the first row of inputDF with a different creation_date value
val updateDF = inputDF.limit(1).withColumn("creation_date", lit("2014-01-01"))
updateDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.overwrite).saveAsTable("work.hudi_test")
while writing this write statement i m getting below error message.
java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2
可以请指导我如何写这个声明。