问题标签 [apache-spark-dataset]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
14 回答
148824 浏览

dataframe - Spark中DataFrame、Dataset和RDD的区别

我只是想知道 Apache Spark 中的RDDDataFrame (Spark 2.0.0 DataFrame 只是类型别名Dataset[Row]有什么区别?

你能把一个转换成另一个吗?

0 投票
1 回答
20217 浏览

performance - DataFrame / Dataset groupBy 行为/优化

假设我们有df由以下列组成的 DataFrame:

姓名、姓氏、尺寸、宽度、长度、重量

现在我们要执行几个操作,例如我们要创建几个包含大小和宽度数据的 DataFrame。

如您所见,其他列(例如 Length)并未在任何地方使用。Spark 是否足够聪明,可以在洗牌阶段之前删除冗余列,还是随身携带?威尔运行:

在分组之前以某种方式影响性能?

0 投票
2 回答
20901 浏览

java - 如何在 Java 中将 DataFrame 转换为 Apache Spark 中的数据集?

我可以很容易地将 DataFrame 转换为 Scala 中的 Dataset:

但在 Java 版本中,我不知道如何将 Dataframe 转换为 Dataset?任何的想法?

我的努力是:

但编译器说:

编辑(解决方案):

基于@Leet-Falcon 答案的解决方案:

0 投票
3 回答
22048 浏览

scala - 为什么使用案例类对 JSON 进行编码时出现错误“无法找到存储在数据集中的类型的编码器”?

我写过火花工作:

在 IDE 中,当我运行 main 函数时,出现 2 个错误:

但在 Spark Shell 中,我可以运行此作业而不会出现任何错误。问题是什么?

0 投票
2 回答
2185 浏览

apache-spark - Spark 1.6 中的数据集

我正在评估将现有的 RDD 代码替换为数据集。对于我的一个用例,我无法将数据集映射到另一个案例类。

这就是我想要做的......

任何帮助,将不胜感激。

除了以下情况:

线程“主”org.apache.spark.SparkException 中的异常:作业因阶段失败而中止:任务不可序列化:java.io.NotSerializableException:scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 序列化堆栈:- 对象不是可序列化(类:scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1,值:包 lang)-字段(类:scala.reflect.internal.Types$ThisType,名称:sym,类型:类 scala.reflect.internal .Symbols$Symbol) - 对象 (class scala.reflect.internal.Types$UniqueThisType, java.lang.type) - 字段 (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class scala.reflect .internal.Types$Type) - 对象 (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) - 字段 (class: scala.reflect.internal.Types$TypeRef, name: normalized,类型:类 scala.reflect.internal.Types$Type)-对象(类 scala.reflect.internal.Types$AliasNoArgsTypeRef,字符串)-字段(类:org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6,名称:keyType$1,类型:class scala.reflect.api.Types$TypeApi) - 对象(类 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) - 字段(类:org.apache.spark. sql.catalyst.expressions.MapObjects,名称:函数,类型:接口 scala.Function1) - 对象(类 org.apache.spark.sql.catalyst.expressions.MapObjects,mapobjects(,invoke(upcast('map,MapType(StringType ,StringType,true),- 字段 (类: "scala.collection.immutable.Map", 名称: "map"),- 根类: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType )) - 字段(类:org.apache.spark.sql.catalyst.expressions。调用,名称:targetObject,类型:类 org.apache.spark.sql.catalyst.expressions.Expression) - 对象(类 org.apache.spark.sql.catalyst.expressions.Invoke,invoke(mapobjects(,invoke(upcast( 'map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType ,true)),StringType),array,ObjectType(class [Ljava.lang.Object;))) - writeObject 数据(类:scala.collection.immutable.List$SerializationProxy) - 对象(类 scala.collection.immutable.List $SerializationProxy, scala.collection.immutable.List$SerializationProxy@7e78c3cf) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(invoke(地图对象(,调用(upcast('map,MapType(StringType,StringType,true),-字段(类:“scala.collection.immutable.Map”,名称:“map”),-根类:“collector.MyMap”),keyArray ,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- 字段(类:“scala.collection.immutable.Map”,名称:“map”),-根类:“collector.MyMap”),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(类[ Ljava.lang.Object;)))) - 字段(类:org.apache.spark.sql.catalyst.expressions.StaticInvoke,名称:参数,类型:接口 scala.collection.Seq) - 对象(类 org.apache。 spark.sql.catalyst.expressions.StaticInvoke,staticinvoke(类 org.apache.spark.sql。catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection. immutable.Map", name: "map"),- 根类: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)) ,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector .MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),true)) - writeObject 数据(类:scala.collection.immutable.List$ SerializationProxy) - 对象(类 scala.collection.immutable。List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@377795c5) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(staticinvoke (类 org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true), - 字段(类:“scala.collection.immutable.Map”,名称:“map”),- 根类:“collector.MyMap”),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(类 [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true)),- field (class: "scala.collection.immutable.Map", name: " map"),- 根类:"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),true))) - 字段(类:org.apache.spark.sql .catalyst.expressions.NewInstance,名称:参数,类型:接口 scala.collection.Seq) - 对象(类 org.apache.spark.sql.catalyst.expressions.NewInstance,newinstance(class collector.MyMap,staticinvoke(class org. apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-字段(类:“scala.collection.immutable.Map”,名称:“map”),-根类:“collector.MyMap”),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(类[Ljava。 lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- 字段(类:“scala.collection.immutable.Map”,名称:“map”),- 根类:“collector.MyMap”),valueArray,ArrayType(StringType,true) ),StringType),array,ObjectType(class [Ljava.lang.Object;)),true),false,ObjectType(class collector.MyMap),None)) - 字段(类:org.apache.spark.sql.catalyst .encoders.ExpressionEncoder,名称:fromRowExpression,类型:类 org.apache.spark.sql.catalyst.expressions.Expression) - 对象(类 org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,类[map#ExprId( 9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map]) - 字段(类:org.apache.spark.sql.execution.MapPartitions,名称:uEncoder,类型:类 org.apache.spark.sql.catalyst.encoders .ExpressionEncoder) - 对象(类 org.apache.spark.sql.execution.MapPartitions,!MapPartitions , class[a[0]: string, b[0]: string], class[map#ExprId(9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map], [map#13] +- LocalTableScan [ a#2,b#3], [[0,180000000a,2800000005,2d35302d35313032,3130,3161746164],[0,180000000a,2800000005,2d35302d35313032,3130,326174:org.4] 类字段。 sql.execution.MapPartitions$$anonfun$8, name: $outer, type: class org.apache.spark.sql.execution.MapPartitions) - 对象(class org.apache.spark.sql.execution.MapPartitions$$anonfun$8, ) - 字段(类:org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1,名称:f$22,类型:接口 scala.Function1) - 对象(类 org.apache.spark.rdd.RDD$$anonfun $mapPartitionsInternal$1, ) - 字段(类:org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21,名称:$outer,类型:类 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) - 对象(类 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, ) - 字段(类:org.apache. spark.rdd.MapPartitionsRDD,名称:f,类型:接口 scala.Function3)-对象(类 org.apache.spark.rdd.MapPartitionsRDD,MapPartitionsRDD[1] 在 CollectorSparkTest.scala:50 上展示)-字段(类:org .apache.spark.NarrowDependency,名称:NarrowDependency,名称:NarrowDependency,名称:rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@110f15b7) - writeObject data (class: scala.collection.immutable.List $SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@6bb23696) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala .collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@110f15b7)) - 字段(类:org.apache.spark.rdd.RDD,名称:org$apache$spark$rdd$RDD$ $依赖项, type: interface scala.collection.Seq) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[2] at show at CollectorSparkTest.scala:50) - field (class: scala.Tuple2, name: _1,类型:类 java.lang.Object) - org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ 上的对象 (class scala.Tuple2, (MapPartitionsRDD[2] at show at CollectorSparkTest.scala:50,)) scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$ anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at组织。apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1010) at org.apache.spark.scheduler.DAGScheduler.org$apache$ spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala :1607) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 在 org.apache.spark.util 的 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)。 EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark。SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark .sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) 在 org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) 在 org.apache.spark.sql.execution.SparkPlan.executeCollectPublic (SparkPlan.scala:174) 在 org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) 在 org.apache.spark .sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$$execute$1$1.apply(DataFrame.scala:1538) 在 org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala :56) 在 org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) 在 org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) 在 org.apache.spark.sql.DataFrame.org$apache$ spark$sql$DataFrame$$collect(DataFrame.scala:1544) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414) at org.apache.spark.sql.DataFrame $$anonfun$head$1.apply(DataFrame.scala:1413) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) at org.apache.spark.sql.DataFrame.head(DataFrame.scala :1413) 在 org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) 在 org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) 在 org.apache.spark.sql。 DataFrame.show(DataFrame.scala:394) at org.apache.spark.sql.Dataset.show(Dataset.scala:228) at org.apache.spark.sql.Dataset.show(Dataset.scala:192) at org .apache.spark.sql.Dataset.show(Dataset.scala:200)

0 投票
2 回答
3024 浏览

apache-spark - 如何将数据集序列化为二进制文件/镶木地板?

我应该如何序列化一个DataSet?有没有办法使用Encoder创建二进制文件,或者我应该将其转换为 aDataFrame然后将其保存为镶木地板?

0 投票
1 回答
830 浏览

apache-spark - Spark SQL 中有关 Dataset.filter 的错误

我只想过滤数据集以包含可以在 MySQL 中找到的记录。

这是数据集:

这是 MySQL 中的表:

这是我的代码(在 spark-shell 中运行):

但我得到“java.lang.NullPointerException”

我已经测试过了

我可以得到正确的结果 1 和 0。

过滤器有什么问题?

0 投票
1 回答
452 浏览

scala - 如何重命名通过在 Apache Spark 中的 GroupedDataset 上操作创建的新列?

如何在count不将结果转换为 DataFrame 的情况下重命名操作列?

0 投票
3 回答
4372 浏览

apache-spark - 如何从自定义类 Person 创建数据集?

我试图Dataset在 Java 中创建一个,所以我编写了以下代码:

Person类是一个内部类。

然而,Spark 会抛出以下异常:

如何正确地做到这一点?

0 投票
1 回答
674 浏览

scala - 如何为 Apache Spark 数据集中的枚举列编写案例类?

我想将我的数据转换为数据集。我有一个列名 storyType(小、中、大、xlarge)。所以我不知道在这种情况下如何编写我的案例类