我只是想知道 Apache Spark 中的RDD
和DataFrame
(Spark 2.0.0 DataFrame 只是类型别名Dataset[Row]
)有什么区别?
你能把一个转换成另一个吗?
我只是想知道 Apache Spark 中的RDD
和DataFrame
(Spark 2.0.0 DataFrame 只是类型别名Dataset[Row]
)有什么区别?
你能把一个转换成另一个吗?
DataFrame
通过谷歌搜索“DataFrame 定义”可以很好地定义A :
数据框是一个表格或二维数组结构,其中每一列包含一个变量的测量值,每一行包含一个案例。
因此,DataFrame
由于其表格格式,a 具有额外的元数据,这允许 Spark 对最终查询运行某些优化。
RDD
另一方面,An只是一个R弹性D分布式数据集,它更像是一个无法优化的数据黑盒,因为可以对其执行的操作不受限制。
RDD
但是,您可以通过它的方法从一个 DataFrame 转到一个rdd
,并且您可以通过该方法从一个转到RDD
一个DataFrame
(如果 RDD 是表格格式)toDF
通常DataFrame
,由于内置查询优化,建议尽可能使用。
首先是
DataFrame
从SchemaRDD
.
是的.. 和之间的转换Dataframe
是RDD
绝对可能的。
下面是一些示例代码片段。
df.rdd
是RDD[Row]
以下是创建数据框的一些选项。
1)yourrddOffrow.toDF
转换为DataFrame
.
2)使用createDataFrame
sql上下文
val df = spark.createDataFrame(rddOfRow, schema)
其中 schema 可以来自以下一些选项,如 nice SO post 所述。
来自 scala case class 和 scala reflection apiimport org.apache.spark.sql.catalyst.ScalaReflection val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType]
或使用
Encoders
import org.apache.spark.sql.Encoders val mySchema = Encoders.product[MyCaseClass].schema
如 Schema 所述,也可以使用
StructType
和 创建StructField
val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("col1", DoubleType, true)) .add(StructField("col2", DoubleType, true)) etc...
RDD
接口:自 1.0 版本以来,
RDD
(弹性分布式数据集)API 一直在 Spark 中。API 提供了
RDD
许多转换方法,例如map
()、filter
() 和reduce
(),用于对数据执行计算。这些方法中的每一个都会产生一个新RDD
的表示转换后的数据。但是,这些方法只是定义要执行的操作,并且在调用操作方法之前不会执行转换。操作方法的示例是collect
() 和saveAsObjectFile
()。
RDD 示例:
rdd.filter(_.age > 21) // transformation
.map(_.last)// transformation
.saveAsObjectFile("under21.bin") // action
示例:使用 RDD 按属性过滤
rdd.filter(_.age > 21)
DataFrame
APISpark 1.3 引入了一个新的
DataFrame
API 作为 Project Tungsten 计划的一部分,该计划旨在提高 Spark 的性能和可扩展性。API 引入了模式的DataFrame
概念来描述数据,允许 Spark 以比使用 Java 序列化更有效的方式管理模式并仅在节点之间传递数据。API与
DataFrame
API 完全不同,RDD
因为它是用于构建关系查询计划的 API,然后 Spark 的 Catalyst 优化器可以执行该计划。API 对于熟悉构建查询计划的开发人员来说是很自然的
示例 SQL 样式:
df.filter("age > 21");
限制: 因为代码是按名称引用数据属性,所以编译器不可能捕获任何错误。如果属性名称不正确,则仅在运行时创建查询计划时才会检测到错误。
API 的另一个缺点DataFrame
是它非常以 scala 为中心,虽然它确实支持 Java,但支持有限。
例如,当DataFrame
从现有RDD
的 Java 对象创建 a 时,Spark 的 Catalyst 优化器无法推断模式并假定 DataFrame 中的任何对象都实现了该scala.Product
接口。Scalacase class
开箱即用,因为他们实现了这个接口。
Dataset
API该
Dataset
API 在 Spark 1.6 中作为 API 预览版发布,旨在提供两全其美的体验;熟悉的面向对象编程风格和RDD
API 的编译时类型安全,但具有 Catalyst 查询优化器的性能优势。DataFrame
数据集也使用与API相同的高效堆外存储机制 。当涉及到序列化数据时,
Dataset
API 具有 编码器的概念,它在 JVM 表示(对象)和 Spark 的内部二进制格式之间进行转换。Spark 具有非常先进的内置编码器,它们生成字节码以与堆外数据交互,并提供对单个属性的按需访问,而无需反序列化整个对象。Spark 尚未提供用于实现自定义编码器的 API,但计划在未来的版本中提供。此外,该
Dataset
API 旨在与 Java 和 Scala 同等地工作。使用 Java 对象时,它们完全符合 bean 是很重要的。
示例Dataset
API SQL 样式:
dataset.filter(_.age < 21);
凯利板级流量。.(来自 spark 峰会的揭秘 DataFrame 和 Dataset 演示文稿)
进一步阅读... databricks文章 - 三个 Apache Spark API 的故事:RDD 与 DataFrames 和 Datasets
Apache Spark 提供三种类型的 API
这是 RDD、Dataframe 和 Dataset 之间的 API 比较。
Spark 提供的主要抽象是弹性分布式数据集 (RDD),它是跨集群节点分区的元素集合,可以并行操作。
分布式收集:
RDD 使用 MapReduce 操作,该操作被广泛用于在集群上使用并行分布式算法处理和生成大型数据集。它允许用户使用一组高级运算符编写并行计算,而不必担心工作分配和容错。
不可变的: RDD 由分区的记录集合组成。分区是RDD中并行的基本单元,每个分区是数据的一个逻辑划分,是不可变的,是通过对现有分区的一些变换而创建的。不可变性有助于实现计算的一致性。
容错性: 如果我们丢失了 RDD 的某个分区,我们可以在 lineage 中重放该分区上的转换以实现相同的计算,而不是跨多个节点进行数据复制。这个特性是 RDD 的最大好处,因为它节省了在数据管理和复制方面付出了很多努力,从而实现了更快的计算。
惰性计算: Spark 中的所有转换都是惰性的,因为它们不会立即计算结果。相反,他们只记得应用于某些基础数据集的转换。仅当操作需要将结果返回给驱动程序时才计算转换。
功能转换: RDD 支持两种类型的操作:转换(从现有数据集创建新数据集)和操作(在对数据集运行计算后将值返回给驱动程序)。
数据处理格式:
它可以轻松有效地处理结构化数据和非结构化数据。
支持的编程语言:
RDD API 在 Java、Scala、Python 和 R 中可用。
没有内置优化引擎: 在处理结构化数据时,RDD 无法利用 Spark 的高级优化器,包括催化剂优化器和 Tungsten 执行引擎。开发者需要根据每个 RDD 的属性对其进行优化。
处理结构化数据: 与 Dataframe 和数据集不同,RDD 不会推断摄取数据的模式,并且需要用户指定它。
Spark 在 Spark 1.3 版本中引入了 Dataframes。Dataframe 克服了 RDD 所面临的关键挑战。
DataFrame 是组织成命名列的分布式数据集合。它在概念上等同于关系数据库或 R/Python Dataframe 中的表。除了 Dataframe,Spark 还引入了催化剂优化器,它利用高级编程特性来构建可扩展的查询优化器。
行对象 的分布式集合:DataFrame 是组织成命名列的分布式数据集合。它在概念上等同于关系数据库中的表,但在底层进行了更丰富的优化。
数据处理: 处理结构化和非结构化数据格式(Avro、CSV、弹性搜索和 Cassandra)和存储系统(HDFS、HIVE 表、MySQL 等)。它可以从所有这些不同的数据源中读取和写入。
使用催化剂优化器进行优化: 它支持 SQL 查询和 DataFrame API。Dataframe分四个阶段使用催化剂树转换框架,
1.Analyzing a logical plan to resolve references
2.Logical plan optimization
3.Physical planning
4.Code generation to compile parts of the query to Java bytecode.
Hive 兼容性: 使用 Spark SQL,您可以在现有 Hive 仓库上运行未修改的 Hive 查询。它重用 Hive 前端和 MetaStore,并为您提供与现有 Hive 数据、查询和 UDF 的完全兼容性。
Tungsten: Tungsten 提供了一个物理执行后端,它显式地管理内存并动态生成用于表达式评估的字节码。
支持的编程语言:
Dataframe API 在 Java、Scala、Python 和 R 中可用。
例子:
case class Person(name : String , age : Int)
val dataframe = sqlContext.read.json("people.json")
dataframe.filter("salary > 10000").show
=> throws Exception : cannot resolve 'salary' given input age , name
当您处理多个转换和聚合步骤时,这尤其具有挑战性。
例子:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
Dataset API 是 DataFrames 的扩展,它提供了一个类型安全的、面向对象的编程接口。它是映射到关系模式的强类型、不可变对象集合。
在 Dataset 的核心,API 是一个称为编码器的新概念,它负责在 JVM 对象和表格表示之间进行转换。表格表示使用 Spark 内部 Tungsten 二进制格式存储,允许对序列化数据进行操作并提高内存利用率。Spark 1.6 支持为各种类型自动生成编码器,包括原始类型(例如 String、Integer、Long)、Scala 案例类和 Java Bean。
提供最好的 RDD 和 Dataframe: RDD(函数式编程,类型安全),DataFrame(关系模型,查询优化,Tungsten 执行,排序和改组)
编码器: 使用编码器,可以很容易地将任何 JVM 对象转换为数据集,允许用户处理结构化和非结构化数据,这与 Dataframe 不同。
支持的编程语言: Datasets API 目前仅在 Scala 和 Java 中可用。1.6 版目前不支持 Python 和 R。Python 支持计划于 2.0 版。
类型安全: Datasets API 提供了 Dataframes 中不可用的编译时安全性。在下面的示例中,我们可以看到 Dataset 如何使用编译 lambda 函数对域对象进行操作。
例子:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
// error : value salary is not a member of person
ds.rdd // returns RDD[Person]
例子:
ds.select(col("name").as[String], $"age".as[Int]).collect()
不支持 Python 和 R:从 1.6 版开始,数据集仅支持 Scala 和 Java。Spark 2.0 将引入 Python 支持。
与现有的 RDD 和 Dataframe API 相比,Datasets API 带来了几个优势,具有更好的类型安全性和函数式编程。面对 API 中类型转换要求的挑战,您仍然无法满足所需的类型安全性,并且会使您的代码变得脆弱。
RDD
RDD
是可以并行操作的元素的容错集合。
DataFrame
DataFrame
是组织成命名列的数据集。它在概念上等同于关系数据库中的表或 R/Python 中的数据框,但在底层进行了更丰富的优化。
Dataset
Dataset
是数据的分布式集合。Dataset 是 Spark 1.6 中添加的一个新接口,它提供了RDD 的优势 (强类型化、使用强大 lambda 函数的能力) 以及 Spark SQL 优化执行引擎的优势。
笔记:
Scala/Java 中的 Dataset of Rows (
Dataset[Row]
) 通常称为 DataFrames。
问:您能否将一种转换为另一种,例如 RDD 转换为 DataFrame,反之亦然?
1.RDD
与DataFrame
_.toDF()
val rowsRdd: RDD[Row] = sc.parallelize(
Seq(
Row("first", 2.0, 7.0),
Row("second", 3.5, 2.5),
Row("third", 7.0, 5.9)
)
)
val df = spark.createDataFrame(rowsRdd).toDF("id", "val1", "val2")
df.show()
+------+----+----+
| id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+
更多方法:在 Spark 中将 RDD 对象转换为 Dataframe
2./to with方法DataFrame
_ DataSet
_RDD
.rdd()
val rowsRdd: RDD[Row] = df.rdd() // DataFrame to RDD
因为DataFrame
是弱类型,开发人员没有得到类型系统的好处。例如,假设您想从 SQL 中读取某些内容并在其上运行一些聚合:
val people = sqlContext.read.parquet("...")
val department = sqlContext.read.parquet("...")
people.filter("age > 30")
.join(department, people("deptId") === department("id"))
.groupBy(department("name"), "gender")
.agg(avg(people("salary")), max(people("age")))
当您说people("deptId")
,您没有取回Int
或 aLong
时,您正在取回Column
需要对其进行操作的对象。在具有丰富类型系统的语言(例如 Scala)中,您最终会失去所有类型安全性,这会增加可能在编译时发现的运行时错误的数量。
反之,DataSet[T]
是打字。当你这样做时:
val people: People = val people = sqlContext.read.parquet("...").as[People]
您实际上是在取回一个People
对象,其中deptId
是实际的整数类型而不是列类型,从而利用了类型系统。
从 Spark 2.0 开始,DataFrame 和 DataSet API 将统一,其中DataFrame
将是DataSet[Row]
.
SimplyRDD
是核心组件,不过DataFrame
是 spark 1.30 中引入的一个 API。
数据分区的集合称为RDD
. 这些RDD
必须遵循一些属性,例如:
这里RDD
要么是结构化的,要么是非结构化的。
DataFrame
是 Scala、Java、Python 和 R 中可用的 API。它允许处理任何类型的结构化和半结构化数据。为了定义DataFrame
,组织成命名列的分布式数据集合称为DataFrame
. 您可以轻松地优化RDDs
. DataFrame
您可以使用 . 一次处理 JSON 数据、parquet 数据、HiveQL 数据DataFrame
。
val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json")
val sample_DF = sampleRDD.toDF()
这里 Sample_DF 认为DataFrame
. sampleRDD
被(原始数据)称为RDD
。
大部分答案都是正确的,只想在这里补充一点
在 Spark 2.0 中,这两个 API(DataFrame +DataSet)将被统一为一个 API。
“统一DataFrame和Dataset:在Scala和Java中,DataFrame和Dataset已经统一了,即DataFrame只是Dataset of Row的类型别名。在Python和R中,由于缺乏类型安全,DataFrame是主要的编程接口。”
数据集与 RDD 类似,但是,它们不使用 Java 序列化或 Kryo,而是使用专门的编码器来序列化对象以通过网络进行处理或传输。
Spark SQL 支持两种不同的方法将现有 RDD 转换为数据集。第一种方法使用反射来推断包含特定类型对象的 RDD 的模式。当您在编写 Spark 应用程序时已经知道架构时,这种基于反射的方法会产生更简洁的代码并且效果很好。
第二种创建数据集的方法是通过一个编程接口,它允许您构建一个模式,然后将其应用到现有的 RDD。虽然此方法更冗长,但它允许您在列及其类型直到运行时才知道时构造数据集。
在这里可以找到 RDD tof 数据框对话答案
DataFrame 等效于 RDBMS 中的表,也可以通过与 RDD 中的“本机”分布式集合类似的方式进行操作。与 RDD 不同,Dataframes 跟踪模式并支持各种关系操作,从而实现更优化的执行。每个 DataFrame 对象代表一个逻辑计划,但由于它们的“惰性”性质,在用户调用特定的“输出操作”之前不会执行。
Dataframe 是 Row 对象的 RDD,每个对象代表一条记录。Dataframe 还知道其行的模式(即数据字段)。虽然 Dataframe 看起来像常规的 RDD,但它们在内部以更有效的方式存储数据,利用了它们的模式。此外,它们提供了 RDD 上不可用的新操作,例如运行 SQL 查询的能力。数据框可以从外部数据源、查询结果或常规 RDD 中创建。
参考:Zaharia M.,等。学习火花(O'Reilly,2015 年)
我希望它有帮助!
Spark RDD (resilient distributed dataset)
:
RDD 是核心数据抽象 API,从 Spark 的第一个版本(Spark 1.0)开始就可用。它是用于操作分布式数据集合的较低级别的 API。RDD API 公开了一些非常有用的方法,可用于对底层物理数据结构进行非常严格的控制。它是分布在不同机器上的分区数据的不可变(只读)集合。RDD 可以在大型集群上进行内存计算,以容错的方式加速大数据处理。为了实现容错,RDD 使用由一组顶点和边组成的 DAG(有向无环图)。DAG 中的顶点和边分别代表 RDD 和要应用于该 RDD 的操作。RDD 上定义的转换是惰性的,仅在调用操作时执行
Spark DataFrame
:
Spark 1.3 引入了两个新的数据抽象 API——DataFrame 和 DataSet。DataFrame API 将数据组织成命名列,就像关系数据库中的表一样。它使程序员能够在分布式数据集合上定义模式。DataFrame 中的每一行都是对象类型的行。与 SQL 表一样,DataFrame 中的每一列必须具有相同的行数。简而言之,DataFrame 是惰性评估计划,它指定需要对分布式数据集合执行的操作。DataFrame 也是一个不可变的集合。
Spark DataSet
:
作为 DataFrame API 的扩展,Spark 1.3 还引入了 DataSet API,它在 Spark 中提供了严格类型化和面向对象的编程接口。它是不可变的、类型安全的分布式数据集合。与 DataFrame 一样,DataSet API 也使用 Catalyst 引擎来启用执行优化。DataSet 是 DataFrame API 的扩展。
Other Differences
-
一个。RDD (Spark1.0) —> 数据框(Spark1.3)-> 数据集(Spark1.6)
湾。RDD 让我们决定如何做,这限制了 Spark 可以对底层处理进行的优化。dataframe/dataset 让我们决定我们想要做什么,并将一切都留在 Spark 上来决定如何进行计算。
C。RDD 作为内存中的 jvm 对象,RDD 涉及垃圾收集和 Java(或更好的 Kryo)序列化的开销,这些开销在数据增长时会很昂贵。那就是降低性能。
数据框架提供了比 RDD 更大的性能改进,因为它具有 2 个强大的功能:
d。数据集(Project Tungsten 和 Catalyst Optimizer)如何在数据帧上得分是它具有的附加功能:编码器
DataFrame是具有模式的 RDD 。您可以将其视为关系数据库表,因为每一列都有一个名称和一个已知类型。DataFrames的强大之处在于,当您从结构化数据集(Json、Parquet..)创建 DataFrame 时,Spark 能够通过遍历整个(Json、Parquet..)数据集来推断模式。正在加载。然后,在计算执行计划时,Spark 可以使用模式并进行更好的计算优化。请注意,DataFrame在 Spark v1.3.0 之前称为 SchemaRDD
火花 RDD –
RDD 代表弹性分布式数据集。它是只读分区的记录集合。RDD 是 Spark 的基础数据结构。它允许程序员以容错方式在大型集群上执行内存计算。因此,加快任务。
火花数据框-
与 RDD 不同,数据组织成命名列。例如关系数据库中的表。它是一个不可变的分布式数据集合。Spark 中的 DataFrame 允许开发人员将结构强加到分布式数据集合上,从而实现更高级别的抽象。
火花数据集-
Apache Spark 中的数据集是 DataFrame API 的扩展,它提供类型安全、面向对象的编程接口。Dataset 通过将表达式和数据字段暴露给查询计划器来利用 Spark 的 Catalyst 优化器。