问题标签 [spark-dataframe]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
936 浏览

scala - 无法从引发 serde 异常的 spark 将数据帧保存为配置单元表

我已经在数据框中加载了我的一个表并尝试将其保存为配置单元表

我遇到异常 org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: cannot find field mytable from [public java.util.ArrayList org.apache.hadoop.hive.serde2.ColumnSet.上校]

0 投票
3 回答
19118 浏览

java - 如何使用 Java 将 unix 纪元列转换为 Apache spark DataFrame 中的日期?

我有一个 json 数据文件,其中包含一个属性 [creationDate],它是“长”数字类型的 unix epoc。Apache Spark DataFrame 架构如下所示:

我想做一些需要从“creationDate”获取的groupBy“creationData_Year”。

使用 Java在DataFrame中进行这种转换的最简单方法是什么?

0 投票
2 回答
20901 浏览

java - 如何在 Java 中将 DataFrame 转换为 Apache Spark 中的数据集?

我可以很容易地将 DataFrame 转换为 Scala 中的 Dataset:

但在 Java 版本中,我不知道如何将 Dataframe 转换为 Dataset?任何的想法?

我的努力是:

但编译器说:

编辑(解决方案):

基于@Leet-Falcon 答案的解决方案:

0 投票
1 回答
886 浏览

apache-spark - 在 Spark DataFrame 上保存到 JSON 并重新加载,模式列序列发生变化

我正在使用 spark DataFrames 并尝试对相同模式的 DataFrames 进行重复数据删除。

将 DataFrame 保存到 JSON 之前的架构如下:

从 JSON 文件加载后的 DataFrame 架构如下:

我将 JSON 保存为:

并回读为:

做 unionAll 之后

或除了

由于架构更改,重复数据删除失败。

我可以避免这种模式转换吗?有没有办法在保存到 JSON 文件和从 JSON 文件加载回来时保存(或强制执行)模式序列?

0 投票
3 回答
4796 浏览

scala - Spark 1.6:java.lang.IllegalArgumentException:spark.sql.execution.id 已设置

我正在使用 spark 1.6 并在运行以下代码时遇到上述问题:

这是 github 要点:https ://gist.github.com/karanveerm/27d852bf311e39f05491

我得到的错误是:在

这是火花错误还是我做错了什么/任何解决方法?

0 投票
2 回答
2185 浏览

apache-spark - Spark 1.6 中的数据集

我正在评估将现有的 RDD 代码替换为数据集。对于我的一个用例,我无法将数据集映射到另一个案例类。

这就是我想要做的......

任何帮助,将不胜感激。

除了以下情况:

线程“主”org.apache.spark.SparkException 中的异常:作业因阶段失败而中止:任务不可序列化:java.io.NotSerializableException:scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 序列化堆栈:- 对象不是可序列化(类:scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1,值:包 lang)-字段(类:scala.reflect.internal.Types$ThisType,名称:sym,类型:类 scala.reflect.internal .Symbols$Symbol) - 对象 (class scala.reflect.internal.Types$UniqueThisType, java.lang.type) - 字段 (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class scala.reflect .internal.Types$Type) - 对象 (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) - 字段 (class: scala.reflect.internal.Types$TypeRef, name: normalized,类型:类 scala.reflect.internal.Types$Type)-对象(类 scala.reflect.internal.Types$AliasNoArgsTypeRef,字符串)-字段(类:org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6,名称:keyType$1,类型:class scala.reflect.api.Types$TypeApi) - 对象(类 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) - 字段(类:org.apache.spark. sql.catalyst.expressions.MapObjects,名称:函数,类型:接口 scala.Function1) - 对象(类 org.apache.spark.sql.catalyst.expressions.MapObjects,mapobjects(,invoke(upcast('map,MapType(StringType ,StringType,true),- 字段 (类: "scala.collection.immutable.Map", 名称: "map"),- 根类: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType )) - 字段(类:org.apache.spark.sql.catalyst.expressions。调用,名称:targetObject,类型:类 org.apache.spark.sql.catalyst.expressions.Expression) - 对象(类 org.apache.spark.sql.catalyst.expressions.Invoke,invoke(mapobjects(,invoke(upcast( 'map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType ,true)),StringType),array,ObjectType(class [Ljava.lang.Object;))) - writeObject 数据(类:scala.collection.immutable.List$SerializationProxy) - 对象(类 scala.collection.immutable.List $SerializationProxy, scala.collection.immutable.List$SerializationProxy@7e78c3cf) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(invoke(地图对象(,调用(upcast('map,MapType(StringType,StringType,true),-字段(类:“scala.collection.immutable.Map”,名称:“map”),-根类:“collector.MyMap”),keyArray ,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- 字段(类:“scala.collection.immutable.Map”,名称:“map”),-根类:“collector.MyMap”),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(类[ Ljava.lang.Object;)))) - 字段(类:org.apache.spark.sql.catalyst.expressions.StaticInvoke,名称:参数,类型:接口 scala.collection.Seq) - 对象(类 org.apache。 spark.sql.catalyst.expressions.StaticInvoke,staticinvoke(类 org.apache.spark.sql。catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection. immutable.Map", name: "map"),- 根类: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)) ,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector .MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),true)) - writeObject 数据(类:scala.collection.immutable.List$ SerializationProxy) - 对象(类 scala.collection.immutable。List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@377795c5) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(staticinvoke (类 org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true), - 字段(类:“scala.collection.immutable.Map”,名称:“map”),- 根类:“collector.MyMap”),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(类 [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true)),- field (class: "scala.collection.immutable.Map", name: " map"),- 根类:"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),true))) - 字段(类:org.apache.spark.sql .catalyst.expressions.NewInstance,名称:参数,类型:接口 scala.collection.Seq) - 对象(类 org.apache.spark.sql.catalyst.expressions.NewInstance,newinstance(class collector.MyMap,staticinvoke(class org. apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-字段(类:“scala.collection.immutable.Map”,名称:“map”),-根类:“collector.MyMap”),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(类[Ljava。 lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- 字段(类:“scala.collection.immutable.Map”,名称:“map”),- 根类:“collector.MyMap”),valueArray,ArrayType(StringType,true) ),StringType),array,ObjectType(class [Ljava.lang.Object;)),true),false,ObjectType(class collector.MyMap),None)) - 字段(类:org.apache.spark.sql.catalyst .encoders.ExpressionEncoder,名称:fromRowExpression,类型:类 org.apache.spark.sql.catalyst.expressions.Expression) - 对象(类 org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,类[map#ExprId( 9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map]) - 字段(类:org.apache.spark.sql.execution.MapPartitions,名称:uEncoder,类型:类 org.apache.spark.sql.catalyst.encoders .ExpressionEncoder) - 对象(类 org.apache.spark.sql.execution.MapPartitions,!MapPartitions , class[a[0]: string, b[0]: string], class[map#ExprId(9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map], [map#13] +- LocalTableScan [ a#2,b#3], [[0,180000000a,2800000005,2d35302d35313032,3130,3161746164],[0,180000000a,2800000005,2d35302d35313032,3130,326174:org.4] 类字段。 sql.execution.MapPartitions$$anonfun$8, name: $outer, type: class org.apache.spark.sql.execution.MapPartitions) - 对象(class org.apache.spark.sql.execution.MapPartitions$$anonfun$8, ) - 字段(类:org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1,名称:f$22,类型:接口 scala.Function1) - 对象(类 org.apache.spark.rdd.RDD$$anonfun $mapPartitionsInternal$1, ) - 字段(类:org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21,名称:$outer,类型:类 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) - 对象(类 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, ) - 字段(类:org.apache. spark.rdd.MapPartitionsRDD,名称:f,类型:接口 scala.Function3)-对象(类 org.apache.spark.rdd.MapPartitionsRDD,MapPartitionsRDD[1] 在 CollectorSparkTest.scala:50 上展示)-字段(类:org .apache.spark.NarrowDependency,名称:NarrowDependency,名称:NarrowDependency,名称:rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@110f15b7) - writeObject data (class: scala.collection.immutable.List $SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@6bb23696) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala .collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@110f15b7)) - 字段(类:org.apache.spark.rdd.RDD,名称:org$apache$spark$rdd$RDD$ $依赖项, type: interface scala.collection.Seq) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[2] at show at CollectorSparkTest.scala:50) - field (class: scala.Tuple2, name: _1,类型:类 java.lang.Object) - org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ 上的对象 (class scala.Tuple2, (MapPartitionsRDD[2] at show at CollectorSparkTest.scala:50,)) scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$ anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at组织。apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1010) at org.apache.spark.scheduler.DAGScheduler.org$apache$ spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala :1607) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 在 org.apache.spark.util 的 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)。 EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark。SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark .sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) 在 org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) 在 org.apache.spark.sql.execution.SparkPlan.executeCollectPublic (SparkPlan.scala:174) 在 org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) 在 org.apache.spark .sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$$execute$1$1.apply(DataFrame.scala:1538) 在 org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala :56) 在 org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) 在 org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) 在 org.apache.spark.sql.DataFrame.org$apache$ spark$sql$DataFrame$$collect(DataFrame.scala:1544) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414) at org.apache.spark.sql.DataFrame $$anonfun$head$1.apply(DataFrame.scala:1413) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) at org.apache.spark.sql.DataFrame.head(DataFrame.scala :1413) 在 org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) 在 org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) 在 org.apache.spark.sql。 DataFrame.show(DataFrame.scala:394) at org.apache.spark.sql.Dataset.show(Dataset.scala:228) at org.apache.spark.sql.Dataset.show(Dataset.scala:192) at org .apache.spark.sql.Dataset.show(Dataset.scala:200)

0 投票
1 回答
673 浏览

pyspark - SparkSQL,Spark DataFrame:批量重命名 csv 标头

我正在尝试更改一个非常大的 csv 文件的标题。

我使用 SparkSQL

所有标题some_string在每个标题名称中都有,例如some_string.header_name

我的 Spark 配置conf = SparkConf().setMaster("local[*]").setAppName("readCSV")

要读取我使用的 csv 文件com.databricks.spark.csv package

我的代码

产生错误:

我不喜欢使用logs_df.withColumnRenamed(),因为我有超过 200 列

非常适合任何想法如何快速有效地更改标题

0 投票
2 回答
1180 浏览

arrays - SparkR - 提取数据框的数组对于 R 函数

我有 1000 个传感器,我需要对数据进行分区(即每天每个传感器),然后将每个数据点列表提交给 R 算法)。使用 Spark,简化示例如下所示:

我转换为镶木地板文件,保存它。在 SparkR 中加载镶木地板,没问题,架构说:

所以在 SparkR 中,我有一个数据框,其中每条记录都有我想要的所有数据(df$value)。我想将该数组提取到 R 可以使用的东西中,然后用一个包含结果数组的新列来改变我的原始数据帧(df)。逻辑上类似于结果 = 函数(df$value)。然后我需要将结果(所有行)返回到 SparkR 数据帧中以进行输出。

如何从 SparkR 数据帧中提取一个数组,然后根据结果进行变异?

0 投票
1 回答
2405 浏览

apache-spark - 在 Spark 中将列附加到行

我有一个DataFrame我想通过新列扩展的。这里DateFrame解释了从Rows创建一个新的。

我目前的策略是Row使用 RowFactory 从Row传递到我调用的地图中的 s构造新的 s,DataFrame.javaRDD().map(...)但我担心这可能会产生不必要的成本。

所以我想知道是否可以通过附加新字段来Row扩展现有的 s,而不是创建新的 s。界面似乎不允许这样做RowRow

行代码

0 投票
1 回答
3082 浏览

scala - 未找到:值 udf 错误

我在我的代码中定义了一个 udf,如下所示:

我正在尝试在我的数据集中获取时间戳字段的子字符串。但是我收到了未找到的错误:value udf

我究竟做错了什么?