2

我正在评估将现有的 RDD 代码替换为数据集。对于我的一个用例,我无法将数据集映射到另一个案例类。

这就是我想要做的......

case class MyMap(map: Map[String, String])

case class V1(a: String, b: String){
  def toMyMap: MyMap = {
    MyMap(Map(a->b))
  }

  def toStr: String = {
    a
  }
}

object MyApp extends App {
//Get handle to sqlContext and other useful stuff here.
val df1 = sqlContext.createDataset(Seq(V1("2015-05-01", "data1"), V1("2015-05-01", "data2"))).toDF()
df1.as[V1].map(_.toMyMap).show() //Errors out. Added the exception below.
df1.as[V1].map(_.toStr).show() //Works fine.
}

任何帮助,将不胜感激。

除了以下情况:

线程“主”org.apache.spark.SparkException 中的异常:作业因阶段失败而中止:任务不可序列化:java.io.NotSerializableException:scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 序列化堆栈:- 对象不是可序列化(类:scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1,值:包 lang)-字段(类:scala.reflect.internal.Types$ThisType,名称:sym,类型:类 scala.reflect.internal .Symbols$Symbol) - 对象 (class scala.reflect.internal.Types$UniqueThisType, java.lang.type) - 字段 (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class scala.reflect .internal.Types$Type) - 对象 (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) - 字段 (class: scala.reflect.internal.Types$TypeRef, name: normalized,类型:类 scala.reflect.internal.Types$Type)-对象(类 scala.reflect.internal.Types$AliasNoArgsTypeRef,字符串)-字段(类:org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6,名称:keyType$1,类型:class scala.reflect.api.Types$TypeApi) - 对象(类 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) - 字段(类:org.apache.spark. sql.catalyst.expressions.MapObjects,名称:函数,类型:接口 scala.Function1) - 对象(类 org.apache.spark.sql.catalyst.expressions.MapObjects,mapobjects(,invoke(upcast('map,MapType(StringType ,StringType,true),- 字段 (类: "scala.collection.immutable.Map", 名称: "map"),- 根类: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType )) - 字段(类:org.apache.spark.sql.catalyst.expressions。调用,名称:targetObject,类型:类 org.apache.spark.sql.catalyst.expressions.Expression) - 对象(类 org.apache.spark.sql.catalyst.expressions.Invoke,invoke(mapobjects(,invoke(upcast( 'map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType ,true)),StringType),array,ObjectType(class [Ljava.lang.Object;))) - writeObject 数据(类:scala.collection.immutable.List$SerializationProxy) - 对象(类 scala.collection.immutable.List $SerializationProxy, scala.collection.immutable.List$SerializationProxy@7e78c3cf) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(invoke(地图对象(,调用(upcast('map,MapType(StringType,StringType,true),-字段(类:“scala.collection.immutable.Map”,名称:“map”),-根类:“collector.MyMap”),keyArray ,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- 字段(类:“scala.collection.immutable.Map”,名称:“map”),-根类:“collector.MyMap”),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(类[ Ljava.lang.Object;)))) - 字段(类:org.apache.spark.sql.catalyst.expressions.StaticInvoke,名称:参数,类型:接口 scala.collection.Seq) - 对象(类 org.apache。 spark.sql.catalyst.expressions.StaticInvoke,staticinvoke(类 org.apache.spark.sql。catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection. immutable.Map", name: "map"),- 根类: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)) ,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector .MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),true)) - writeObject 数据(类:scala.collection.immutable.List$ SerializationProxy) - 对象(类 scala.collection.immutable。List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@377795c5) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(staticinvoke (类 org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true), - 字段(类:“scala.collection.immutable.Map”,名称:“map”),- 根类:“collector.MyMap”),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(类 [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true)),- field (class: "scala.collection.immutable.Map", name: " map"),- 根类:"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),true))) - 字段(类:org.apache.spark.sql .catalyst.expressions.NewInstance,名称:参数,类型:接口 scala.collection.Seq) - 对象(类 org.apache.spark.sql.catalyst.expressions.NewInstance,newinstance(class collector.MyMap,staticinvoke(class org. apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-字段(类:“scala.collection.immutable.Map”,名称:“map”),-根类:“collector.MyMap”),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(类[Ljava。 lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- 字段(类:“scala.collection.immutable.Map”,名称:“map”),- 根类:“collector.MyMap”),valueArray,ArrayType(StringType,true) ),StringType),array,ObjectType(class [Ljava.lang.Object;)),true),false,ObjectType(class collector.MyMap),None)) - 字段(类:org.apache.spark.sql.catalyst .encoders.ExpressionEncoder,名称:fromRowExpression,类型:类 org.apache.spark.sql.catalyst.expressions.Expression) - 对象(类 org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,类[map#ExprId( 9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map]) - 字段(类:org.apache.spark.sql.execution.MapPartitions,名称:uEncoder,类型:类 org.apache.spark.sql.catalyst.encoders .ExpressionEncoder) - 对象(类 org.apache.spark.sql.execution.MapPartitions,!MapPartitions , class[a[0]: string, b[0]: string], class[map#ExprId(9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map], [map#13] +- LocalTableScan [ a#2,b#3], [[0,180000000a,2800000005,2d35302d35313032,3130,3161746164],[0,180000000a,2800000005,2d35302d35313032,3130,326174:org.4] 类字段。 sql.execution.MapPartitions$$anonfun$8, name: $outer, type: class org.apache.spark.sql.execution.MapPartitions) - 对象(class org.apache.spark.sql.execution.MapPartitions$$anonfun$8, ) - 字段(类:org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1,名称:f$22,类型:接口 scala.Function1) - 对象(类 org.apache.spark.rdd.RDD$$anonfun $mapPartitionsInternal$1, ) - 字段(类:org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21,名称:$outer,类型:类 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) - 对象(类 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, ) - 字段(类:org.apache. spark.rdd.MapPartitionsRDD,名称:f,类型:接口 scala.Function3)-对象(类 org.apache.spark.rdd.MapPartitionsRDD,MapPartitionsRDD[1] 在 CollectorSparkTest.scala:50 上展示)-字段(类:org .apache.spark.NarrowDependency,名称:NarrowDependency,名称:NarrowDependency,名称:rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@110f15b7) - writeObject data (class: scala.collection.immutable.List $SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@6bb23696) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala .collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@110f15b7)) - 字段(类:org.apache.spark.rdd.RDD,名称:org$apache$spark$rdd$RDD$ $依赖项, type: interface scala.collection.Seq) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[2] at show at CollectorSparkTest.scala:50) - field (class: scala.Tuple2, name: _1,类型:类 java.lang.Object) - org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ 上的对象 (class scala.Tuple2, (MapPartitionsRDD[2] at show at CollectorSparkTest.scala:50,)) scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$ anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at组织。apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1010) at org.apache.spark.scheduler.DAGScheduler.org$apache$ spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala :1607) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 在 org.apache.spark.util 的 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)。 EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark。SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark .sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) 在 org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) 在 org.apache.spark.sql.execution.SparkPlan.executeCollectPublic (SparkPlan.scala:174) 在 org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) 在 org.apache.spark .sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$$execute$1$1.apply(DataFrame.scala:1538) 在 org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala :56) 在 org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) 在 org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) 在 org.apache.spark.sql.DataFrame.org$apache$ spark$sql$DataFrame$$collect(DataFrame.scala:1544) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414) at org.apache.spark.sql.DataFrame $$anonfun$head$1.apply(DataFrame.scala:1413) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) at org.apache.spark.sql.DataFrame.head(DataFrame.scala :1413) 在 org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) 在 org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) 在 org.apache.spark.sql。 DataFrame.show(DataFrame.scala:394) at org.apache.spark.sql.Dataset.show(Dataset.scala:228) at org.apache.spark.sql.Dataset.show(Dataset.scala:192) at org .apache.spark.sql.Dataset.show(Dataset.scala:200)

4

2 回答 2

2

我认为您实际上可能遇到了SPARK-12696,它在 spark/master 中已修复。我希望在不久的将来发布 1.6.1,其中应该包含这个补丁。

于 2016-01-20T20:23:05.723 回答
1

问题是 scala Map 类不可序列化,因此 Dataset API 无法自动生成适当的编码器。我建议将映射转换为字符串,然后解析字符串并转换回映射(假设您将字符串存储在映射中)。

Dataset API 也可能不是最佳选择。我写了这篇文章,可能很有趣。

于 2016-01-16T14:15:40.710 回答