我正在尝试将 Spark DF 编写为 DeltaTable。它在我的 IDE Intelliji 中运行良好,但是具有相同的依赖项和版本,它在我的 spark REPL(Spark shell) 中不起作用
Spark 版本:2.4.0 Scala 版本:2.11.8
Intelliji 中的依赖项(整个项目的依赖项,请忽略相关)
compile 'org.scala-lang:scala-library:2.11.8'
compile 'org.scala-lang:scala-reflect:2.11.8'
compile 'org.scala-lang:scala-compiler:2.11.8'
compile 'org.scala-lang.modules:scala-parser-combinators_2.11:1.1.2'
compile 'org.scala-lang.modules:scala-swing_2.11:2.0.3'
compile 'org.apache.spark:spark-mllib_2.11:2.4.0'
compile 'org.apache.spark:spark-sql_2.11:2.4.0'
compile 'org.apache.spark:spark-graphx_2.11:2.4.0'
compile 'org.apache.spark:spark-launcher_2.11:2.4.0'
compile 'org.apache.spark:spark-catalyst_2.11:2.4.0'
compile 'org.apache.spark:spark-streaming_2.11:2.4.0'
compile group: 'io.delta', name: 'delta-core_2.11', version: '0.5.0'
compile 'org.apache.spark:spark-core_2.11:2.4.0'
compile 'org.apache.spark:spark-hive_2.11:2.4.0'
compile 'com.databricks:spark-avro_2.11:4.0.0'
compile 'org.apache.avro:avro-mapred:1.8.2'
compile 'org.apache.avro:avro:1.8.2'
compile 'org.apache.avro:avro-compiler:1.8.2'
compile group: 'mysql', name: 'mysql-connector-java', version: '8.0.15'
compile group: 'commons-io', name: 'commons-io', version: '2.5'
testCompile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.26'
testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: 'org.scalatest', name: 'scalatest_2.12', version: '3.2.0-SNAP10'
compile group: 'javax.mail', name: 'javax.mail-api', version: '1.6.2'
compile group: 'com.sun.mail' ,name: 'javax.mail', version: '1.6.0'
compile 'com.hortonworks:shc-core:1.1.1-2.1-s_2.11'
compile 'com.hortonworks:shc:1.1.1-2.1-s_2.11'
compile group: 'org.apache.hbase', name: 'hbase-client', version: '1.2.5'
compile group: 'org.apache.hbase', name: 'hbase-server', version: '1.2.5'
compile group: 'org.apache.hbase', name: 'hbase-common', version: '1.2.5'
compile group: 'org.apache.hbase', name: 'hbase', version: '1.2.5', ext: 'pom'
compile group: 'org.apache.hbase', name: 'hbase-protocol', version: '1.2.5'
compile group: 'org.apache.hbase', name: 'hbase-hadoop2-compat', version: '1.2.5'
compile group: 'org.apache.hbase', name: 'hbase-annotations', version: '1.2.5'
// jackson modues
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.8.6'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.10.0'
compile group: 'org.codehaus.jackson', name: 'jackson-core-asl', version: '1.9.13'
compile group: 'org.codehaus.jackson', name: 'jackson-mapper-asl', version: '1.9.13'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.8.7'
compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.8.6'
compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.8.6'
compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-jaxb-annotations', version: '2.8.6'
compile group: 'org.json4s', name: 'json4s-jackson_2.11', version: '3.2.10'
compile group: 'com.twitter', name: 'parquet-jackson', version: '1.6.0'
compile group: 'org.codehaus.jackson', name: 'jackson-jaxrs', version: '1.9.13'
compile group: 'org.codehaus.jackson', name: 'jackson-xc', version: '1.9.13'
compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-paranamer', version: '2.8.6'
compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.3'
compile group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.7.3'
compile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.7.3'
compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '2.7.3'
compile group: 'org.apache.hadoop', name: 'hadoop-annotations', version: '2.7.3'
compile group: 'org.apache.hadoop', name: 'hadoop-auth', version: '2.7.3'
compile group: 'org.apache.hadoop', name: 'hadoop-yarn-common', version: '2.7.3'
我正在尝试执行的一段代码
import io.delta._
val dF=spark.read.load("path") //parquet file
dF.write.format("delta").mode("overwrite").partitionBy("topic","partition","key").save("path") // delta table
spark-shell 使用的命令:
spark-shell --packages com.fasterxml.jackson.core:jackson-databind:2.8.6,com.fasterxml.jackson.core:jackson-core:2.10.0,org.codehaus.jackson:jackson-core-asl:1.9.13,org.codehaus.jackson:jackson-mapper-asl:1.9.13,com.fasterxml.jackson.core:jackson-annotations:2.8.7,com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.8.6,com.fasterxml.jackson.module:jackson-module-scala_2.11:2.8.6,com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.8.6,com.twitter:parquet-jackson:1.6.0,org.codehaus.jackson:jackson-jaxrs:1.9.13,org.codehaus.jackson:jackson-xc:1.9.13,com.fasterxml.jackson.module:jackson-module-paranamer:2.8.6,io.delta:delta-core_2.11:0.5.0,commons-io:commons-io:2.5
REPL 中的错误:
Exception in thread "main" java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse$default$3()Z
at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:127)
at org.apache.spark.sql.delta.actions.Metadata$$anonfun$schema$1.apply(actions.scala:202)
at org.apache.spark.sql.delta.actions.Metadata$$anonfun$schema$1.apply(actions.scala:201)
at scala.Option.map(Option.scala:146)
at org.apache.spark.sql.delta.actions.Metadata.schema$lzycompute(actions.scala:201)
at org.apache.spark.sql.delta.actions.Metadata.schema(actions.scala:200)
at org.apache.spark.sql.delta.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:61)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.updateMetadata(WriteIntoDelta.scala:45)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:85)
at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:65)
at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:396)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:133)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
at org.controller.deltaLakeEG.deltaLakeHadoopEg$.main(deltaLakeHadoopEg.scala:29)
at org.controller.deltaLakeEG.deltaLakeHadoopEg.main(deltaLakeHadoopEg.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)