1

我正在尝试访问配置单元表并从表/数据框中提取和转换某些列,然后将这些新列放入新的数据框中。我正在尝试以这种方式进行操作-

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

val hiveDF = sqlContext.sql("select * from table_x")

val system_generated_id = hiveDF("unique_key")
val application_assigned_event_id = hiveDF("event_event_id")

val trnEventDf = sqlContext.emptyDataFrame
trnEventDf.withColumn("system_generated_id",lit(system_generated_id))

它使用 sbt 构建时没有任何错误。但是当我尝试运行它时,我收到以下错误 -

线程“主”java.lang.IllegalArgumentException 中的异常:在 org.apache.spark.sql.catalyst.analysis.UnresolvedStar.expand(unresolved.scala:199) 处的 scala.Predef$.require(Predef.scala:221) 要求失败) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$14.apply(Analyzer.scala:354) at org.apache.spark.sql.catalyst.analysis .Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$14.apply(Analyzer.scala:353) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection .TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 在 scala.collection.mutable.ArrayBuffer。foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.sql.catalyst .analysis.Analyzer$ResolveReferences$$anonfun$apply$10.applyOrElse(Analyzer.scala:353) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10.applyOrElse(Analyzer.scala: 347) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$ $anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.plans .logical.LogicalPlan。resolveOperators(LogicalPlan.scala:56) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:347) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences $.apply(Analyzer.scala:328) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83) 在 org.apache。 spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80) 在 scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 在 scala。 collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80) at org.apache.spark。 sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72) 在 org.apache.spark.sql 的 scala.collection.immutable.List.foreach(List.scala:318) .execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:36) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:36) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed (QueryExecution.scala:34) at org.apache.spark.sql.DataFrame.(DataFrame.scala:133) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame .scala:2126) at org.apache.spark.sql.DataFrame.select(DataFrame.scala:707) at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1188) at bacon$.main(bacon .scala:31) 在 bacon.main(bacon.scala) 在 sun.reflect。NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method. java:606) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)

我想了解导致此错误的原因以及是否有其他方法可以完成我想做的事情。

4

1 回答 1

1

通常,您不需要为此创建新的 df。当您通过向其添加唯一 ID 来转换 df 时,您将获得所需的 df。如果您想保存它,只需将其保存为新的配置单元表。

于 2016-07-03T04:18:49.660 回答