313

我只是想知道 Apache Spark 中的RDDDataFrame (Spark 2.0.0 DataFrame 只是类型别名Dataset[Row]有什么区别?

你能把一个转换成另一个吗?

4

14 回答 14

268

DataFrame通过谷歌搜索“DataFrame 定义”可以很好地定义A :

数据框是一个表格或二维数组结构,其中每一列包含一个变量的测量值,每一行包含一个案例。

因此,DataFrame由于其表格格式,a 具有额外的元数据,这允许 Spark 对最终查询运行某些优化。

RDD另一方面,An只是一个R弹性D分布式数据集,它更像是一个无法优化的数据黑盒,因为可以对其执行的操作不受限制

RDD但是,您可以通过它的方法从一个 DataFrame 转到一个rdd,并且您可以通过该方法从一个转到RDD一个DataFrame(如果 RDD 是表格格式)toDF

通常DataFrame,由于内置查询优化,建议尽可能使用。

于 2015-07-20T03:09:05.453 回答
263

首先是DataFrameSchemaRDD.

不赞成使用 SchemaRDD 的方法

是的.. 和之间的转换DataframeRDD绝对可能的。

下面是一些示例代码片段。

  • df.rddRDD[Row]

以下是创建数据框的一些选项。

  • 1)yourrddOffrow.toDF转换为DataFrame.

  • 2)使用createDataFramesql上下文

    val df = spark.createDataFrame(rddOfRow, schema)

其中 schema 可以来自以下一些选项,如 nice SO post 所述。
来自 scala case class 和 scala reflection api

import org.apache.spark.sql.catalyst.ScalaReflection
val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType]

或使用Encoders

import org.apache.spark.sql.Encoders
val mySchema = Encoders.product[MyCaseClass].schema

如 Schema 所述,也可以使用StructType和 创建StructField

val schema = new StructType()
  .add(StructField("id", StringType, true))
  .add(StructField("col1", DoubleType, true))
  .add(StructField("col2", DoubleType, true)) etc...

图片说明

事实上,现在有 3 个 Apache Spark API。

在此处输入图像描述

  1. RDD接口:

自 1.0 版本以来,RDD(弹性分布式数据集)API 一直在 Spark 中。

API 提供了RDD许多转换方法,例如map()、 filter() 和reduce(),用于对数据执行计算。这些方法中的每一个都会产生一个新RDD的表示转换后的数据。但是,这些方法只是定义要执行的操作,并且在调用操作方法之前不会执行转换。操作方法的示例是collect() 和 saveAsObjectFile()。

RDD 示例:

rdd.filter(_.age > 21) // transformation
   .map(_.last)// transformation
.saveAsObjectFile("under21.bin") // action

示例:使用 RDD 按属性过滤

rdd.filter(_.age > 21)
  1. DataFrameAPI

Spark 1.3 引入了一个新的DataFrameAPI 作为 Project Tungsten 计划的一部分,该计划旨在提高 Spark 的性能和可扩展性。API 引入了模式的DataFrame概念来描述数据,允许 Spark 以比使用 Java 序列化更有效的方式管理模式并仅在节点之间传递数据。

API与DataFrameAPI 完全不同,RDD因为它是用于构建关系查询计划的 API,然后 Spark 的 Catalyst 优化器可以执行该计划。API 对于熟悉构建查询计划的开发人员来说是很自然的

示例 SQL 样式:

df.filter("age > 21");

限制: 因为代码是按名称引用数据属性,所以编译器不可能捕获任何错误。如果属性名称不正确,则仅在运行时创建查询计划时才会检测到错误。

API 的另一个缺点DataFrame是它非常以 scala 为中心,虽然它确实支持 Java,但支持有限。

例如,当DataFrame从现有RDD的 Java 对象创建 a 时,Spark 的 Catalyst 优化器无法推断模式并假定 DataFrame 中的任何对象都实现了该scala.Product接口。Scalacase class开箱即用,因为他们实现了这个接口。

  1. DatasetAPI

DatasetAPI 在 Spark 1.6 中作为 API 预览版发布,旨在提供两全其美的体验;熟悉的面向对象编程风格和RDDAPI 的编译时类型安全,但具有 Catalyst 查询优化器的性能优势。DataFrame数据集也使用与API相同的高效堆外存储机制 。

当涉及到序列化数据时,DatasetAPI 具有 编码器的概念,它在 JVM 表示(对象)和 Spark 的内部二进制格式之间进行转换。Spark 具有非常先进的内置编码器,它们生成字节码以与堆外数据交互,并提供对单个属性的按需访问,而无需反序列化整个对象。Spark 尚未提供用于实现自定义编码器的 API,但计划在未来的版本中提供。

此外,该DatasetAPI 旨在与 Java 和 Scala 同等地工作。使用 Java 对象时,它们完全符合 bean 是很重要的。

示例DatasetAPI SQL 样式:

dataset.filter(_.age < 21);

评价不同。DataFrame&之间DataSet 在此处输入图像描述

凯利板级流量。.(来自 spark 峰会的揭秘 DataFrame 和 Dataset 演示文稿) 在此处输入图像描述

进一步阅读... databricks文章 - 三个 Apache Spark API 的故事:RDD 与 DataFrames 和 Datasets

于 2016-08-19T07:23:53.477 回答
153

Apache Spark 提供三种类型的 API

  1. RDD
  2. 数据框
  3. 数据集

比较 RDD、Dataframe 和 Dataset API

这是 RDD、Dataframe 和 Dataset 之间的 API 比较。

RDD

Spark 提供的主要抽象是弹性分布式数据集 (RDD),它是跨集群节点分区的元素集合,可以并行操作。

RDD特点:-

  • 分布式收集:
    RDD 使用 MapReduce 操作,该操作被广泛用于在集群上使用并行分布式算法处理和生成大型数据集。它允许用户使用一组高级运算符编写并行计算,而不必担心工作分配和容错。

  • 不可变的: RDD 由分区的记录集合组成。分区是RDD中并行的基本单元,每个分区是数据的一个逻辑划分,是不可变的,是通过对现有分区的一些变换而创建的。不可变性有助于实现计算的一致性。

  • 容错性: 如果我们丢失了 RDD 的某个分区,我们可以在 lineage 中重放该分区上的转换以实现相同的计算,而不是跨多个节点进行数据复制。这个特性是 RDD 的最大好处,因为它节省了在数据管理和复制方面付出了很多努力,从而实现了更快的计算。

  • 惰性计算: Spark 中的所有转换都是惰性的,因为它们不会立即计算结果。相反,他们只记得应用于某些基础数据集的转换。仅当操作需要将结果返回给驱动程序时才计算转换。

  • 功能转换: RDD 支持两种类型的操作:转换(从现有数据集创建新数据集)和操作(在对数据集运行计算后将值返回给驱动程序)。

  • 数据处理格式:
    它可以轻松有效地处理结构化数据和非结构化数据。

  • 支持的编程语言:
    RDD API 在 Java、Scala、Python 和 R 中可用。

RDD 限制:-

  • 没有内置优化引擎: 在处理结构化数据时,RDD 无法利用 Spark 的高级优化器,包括催化剂优化器和 Tungsten 执行引擎。开发者需要根据每个 RDD 的属性对其进行优化。

  • 处理结构化数据: 与 Dataframe 和数据集不同,RDD 不会推断摄取数据的模式,并且需要用户指定它。

数据框

Spark 在 Spark 1.3 版本中引入了 Dataframes。Dataframe 克服了 RDD 所面临的关键挑战。

DataFrame 是组织成命名列的分布式数据集合。它在概念上等同于关系数据库或 R/Python Dataframe 中的表。除了 Dataframe,Spark 还引入了催化剂优化器,它利用高级编程特性来构建可扩展的查询优化器。

数据框特点:-

  • 行对象 的分布式集合:DataFrame 是组织成命名列的分布式数据集合。它在概念上等同于关系数据库中的表,但在底层进行了更丰富的优化。

  • 数据处理: 处理结构化和非结构化数据格式(Avro、CSV、弹性搜索和 Cassandra)和存储系统(HDFS、HIVE 表、MySQL 等)。它可以从所有这些不同的数据源中读取和写入。

  • 使用催化剂优化器进行优化: 它支持 SQL 查询和 DataFrame API。Dataframe分四个阶段使用催化剂树转换框架,

     1.Analyzing a logical plan to resolve references
     2.Logical plan optimization
     3.Physical planning
     4.Code generation to compile parts of the query to Java bytecode.
    
  • Hive 兼容性: 使用 Spark SQL,您可以在现有 Hive 仓库上运行未修改的 Hive 查询。它重用 Hive 前端和 MetaStore,并为您提供与现有 Hive 数据、查询和 UDF 的完全兼容性。

  • Tungsten: Tungsten 提供了一个物理执行后端,它显式地管理内存并动态生成用于表达式评估的字节码。

  • 支持的编程语言:
    Dataframe API 在 Java、Scala、Python 和 R 中可用。

数据框限制:-

  • 编译时类型安全性: 如前所述,Dataframe API 不支持编译时安全性,这会限制您在不知道结构时操作数据。以下示例在编译期间有效。但是,执行此代码时会出现运行时异常。

例子:

case class Person(name : String , age : Int) 
val dataframe = sqlContext.read.json("people.json") 
dataframe.filter("salary > 10000").show 
=> throws Exception : cannot resolve 'salary' given input age , name

当您处理多个转换和聚合步骤时,这尤其具有挑战性。

  • 无法对域对象进行操作(丢失的域对象): 一旦将域对象转换为数据框,就无法从中重新生成它。在下面的示例中,一旦我们从 personRDD 创建了 personDF,我们将不会恢复 Person 类的原始 RDD (RDD[Person])。

例子:

case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]

数据集 API

Dataset API 是 DataFrames 的扩展,它提供了一个类型安全的、面向对象的编程接口。它是映射到关系模式的强类型、不可变对象集合。

在 Dataset 的核心,API 是一个称为编码器的新概念,它负责在 JVM 对象和表格表示之间进行转换。表格表示使用 Spark 内部 Tungsten 二进制格式存储,允许对序列化数据进行操作并提高内存利用率。Spark 1.6 支持为各种类型自动生成编码器,包括原始类型(例如 String、Integer、Long)、Scala 案例类和 Java Bean。

数据集特点:-

  • 提供最好的 RDD 和 Dataframe: RDD(函数式编程,类型安全),DataFrame(关系模型,查询优化,Tungsten 执行,排序和改组)

  • 编码器: 使用编码器,可以很容易地将任何 JVM 对象转换为数据集,允许用户处理结构化和非结构化数据,这与 Dataframe 不同。

  • 支持的编程语言: Datasets API 目前仅在 Scala 和 Java 中可用。1.6 版目前不支持 Python 和 R。Python 支持计划于 2.0 版。

  • 类型安全: Datasets API 提供了 Dataframes 中不可用的编译时安全性。在下面的示例中,我们可以看到 Dataset 如何使用编译 lambda 函数对域对象进行操作。

例子:

case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
 // error : value salary is not a member of person
ds.rdd // returns RDD[Person]
  • 可互操作:数据集允许您轻松地将现有的 RDD 和数据帧转换为数据集,而无需样板代码。

数据集 API 限制:-

  • 需要类型转换为字符串: 目前从数据集中查询数据需要我们将类中的字段指定为字符串。一旦我们查询了数据,我们就被迫将列转换为所需的数据类型。另一方面,如果我们对 Datasets 使用 map 操作,它将不会使用 Catalyst 优化器。

例子:

ds.select(col("name").as[String], $"age".as[Int]).collect()

不支持 Python 和 R:从 1.6 版开始,数据集仅支持 Scala 和 Java。Spark 2.0 将引入 Python 支持。

与现有的 RDD 和 Dataframe API 相比,Datasets API 带来了几个优势,具有更好的类型安全性和函数式编程。面对 API 中类型转换要求的挑战,您仍然无法满足所需的类型安全性,并且会使您的代码变得脆弱。

于 2016-09-22T05:50:54.423 回答
62

一张图片中的所有(RDD、DataFrame 和 DataSet)。

RDD vs DataFrame vs DataSet

形象学分

RDD

RDD是可以并行操作的元素的容错集合。

DataFrame

DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表或 R/Python 中的数据框,但在底层进行了更丰富的优化

Dataset

Dataset是数据的分布式集合。Dataset 是 Spark 1.6 中添加的一个新接口,它提供了RDD 的优势 (强类型化、使用强大 lambda 函数的能力) 以及 Spark SQL 优化执行引擎的优势


笔记:

Scala/Java 中的 Dataset of Rows ( Dataset[Row]) 通常称为 DataFrames


所有这些都与代码片段进行了很好的比较。

RDD vs DataFrame vs DataSet 与代码

资源


问:您能否将一种转换为另一种,例如 RDD 转换为 DataFrame,反之亦然?

是的,两者都可以

1.RDDDataFrame_.toDF()

val rowsRdd: RDD[Row] = sc.parallelize(
  Seq(
    Row("first", 2.0, 7.0),
    Row("second", 3.5, 2.5),
    Row("third", 7.0, 5.9)
  )
)

val df = spark.createDataFrame(rowsRdd).toDF("id", "val1", "val2")

df.show()
+------+----+----+
|    id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+

更多方法:在 Spark 中将 RDD 对象转换为 Dataframe

2./to with方法DataFrame_ DataSet_RDD.rdd()

val rowsRdd: RDD[Row] = df.rdd() // DataFrame to RDD
于 2017-07-22T09:37:56.077 回答
31

因为DataFrame是弱类型,开发人员没有得到类型系统的好处。例如,假设您想从 SQL 中读取某些内容并在其上运行一些聚合:

val people = sqlContext.read.parquet("...")
val department = sqlContext.read.parquet("...")

people.filter("age > 30")
  .join(department, people("deptId") === department("id"))
  .groupBy(department("name"), "gender")
  .agg(avg(people("salary")), max(people("age")))

当您说people("deptId"),您没有取回Int或 aLong时,您正在取回Column需要对其进行操作的对象。在具有丰富类型系统的语言(例如 Scala)中,您最终会失去所有类型安全性,这会增加可能在编译时发现的运行时错误的数量。

反之,DataSet[T]是打字。当你这样做时:

val people: People = val people = sqlContext.read.parquet("...").as[People]

您实际上是在取回一个People对象,其中deptId是实际的整数类型而不是列类型,从而利用了类型系统。

从 Spark 2.0 开始,DataFrame 和 DataSet API 将统一,其中DataFrame将是DataSet[Row].

于 2016-05-18T13:39:42.793 回答
27

SimplyRDD是核心组件,不过DataFrame是 spark 1.30 中引入的一个 API。

RDD

数据分区的集合称为RDD. 这些RDD必须遵循一些属性,例如:

  • 不可变,
  • 容错,
  • 分散式,
  • 更多的。

这里RDD要么是结构化的,要么是非结构化的。

数据框

DataFrame是 Scala、Java、Python 和 R 中可用的 API。它允许处理任何类型的结构化和半结构化数据。为了定义DataFrame,组织成命名列的分布式数据集合称为DataFrame. 您可以轻松地优化RDDs. DataFrame您可以使用 . 一次处理 JSON 数据、parquet 数据、HiveQL 数据DataFrame

val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json")

val sample_DF = sampleRDD.toDF()

这里 Sample_DF 认为DataFrame. sampleRDD被(原始数据)称为RDD

于 2015-10-30T14:07:43.467 回答
13

大部分答案都是正确的,只想在这里补充一点

在 Spark 2.0 中,这两个 API(DataFrame +DataSet)将被统一为一个 API。

“统一DataFrame和Dataset:在Scala和Java中,DataFrame和Dataset已经统一了,即DataFrame只是Dataset of Row的类型别名。在Python和R中,由于缺乏类型安全,DataFrame是主要的编程接口。”

数据集与 RDD 类似,但是,它们不使用 Java 序列化或 Kryo,而是使用专门的编码器来序列化对象以通过网络进行处理或传输。

Spark SQL 支持两种不同的方法将现有 RDD 转换为数据集。第一种方法使用反射来推断包含特定类型对象的 RDD 的模式。当您在编写 Spark 应用程序时已经知道架构时,这种基于反射的方法会产生更简洁的代码并且效果很好。

第二种创建数据集的方法是通过一个编程接口,它允许您构建一个模式,然后将其应用到现有的 RDD。虽然此方法更冗长,但它允许您在列及其类型直到运行时才知道时构造数据集。

在这里可以找到 RDD tof 数据框对话答案

如何将rdd对象转换为spark中的数据框

于 2016-11-20T13:53:39.133 回答
8

DataFrame 等效于 RDBMS 中的表,也可以通过与 RDD 中的“本机”分布式集合类似的方式进行操作。与 RDD 不同,Dataframes 跟踪模式并支持各种关系操作,从而实现更优化的执行。每个 DataFrame 对象代表一个逻辑计划,但由于它们的“惰性”性质,在用户调用特定的“输出操作”之前不会执行。

于 2015-09-25T03:45:06.113 回答
6

Dataframe 是 Row 对象的 RDD,每个对象代表一条记录。Dataframe 还知道其行的模式(即数据字段)。虽然 Dataframe 看起来像常规的 RDD,但它们在内部以更有效的方式存储数据,利用了它们的模式。此外,它们提供了 RDD 上不可用的新操作,例如运行 SQL 查询的能力。数据框可以从外部数据源、查询结果或常规 RDD 中创建。

参考:Zaharia M.,等。学习火花(O'Reilly,2015 年)

于 2016-10-20T13:27:26.463 回答
6

从使用角度来看,RDD 与 DataFrame 的一些见解:

  1. RDD 太棒了!因为它们为我们提供了处理几乎任何类型数据的灵活性;非结构化、半结构化和结构化数据。因为,很多时候数据还没有准备好放入 DataFrame(甚至是 JSON),RDD 可用于对数据进行预处理,以便它可以放入数据框中。RDD 是 Spark 中的核心数据抽象。
  2. 并非所有可能在 RDD 上进行的转换都可以在 DataFrame 上进行,例如,subtract() 用于 RDD,而 except() 用于 DataFrame。
  3. 由于 DataFrame 类似于关系表,因此在使用集合/关系理论转换时它们遵循严格的规则,例如,如果您想合并两个数据帧,则要求两个 dfs 具有相同的列数和关联的列数据类型。列名可以不同。这些规则不适用于 RDD。这是一个很好的教程,解释了这些事实。
  4. 正如其他人已经深入解释的那样,使用 DataFrames 会带来性能提升。
  5. 使用 DataFrames,您不需要像使用 RDD 编程时那样传递任意函数。
  6. 您需要 SQLContext/HiveContext 来编程数据帧,因为它们位于 Spark 生态系统的 SparkSQL 区域,但对于 RDD,您只需要位于 Spark Core 库中的 SparkContext/JavaSparkContext。
  7. 如果可以为它定义模式,则可以从 RDD 创建一个 df。
  8. 您还可以将 df 转换为 rdd 并将 rdd 转换为 df。

我希望它有帮助!

于 2017-08-25T21:10:09.830 回答
1

Spark RDD (resilient distributed dataset)

RDD 是核心数据抽象 API,从 Spark 的第一个版本(Spark 1.0)开始就可用。它是用于操作分布式数据集合的较低级别的 API。RDD API 公开了一些非常有用的方法,可用于对底层物理数据结构进行非常严格的控制。它是分布在不同机器上的分区数据的不可变(只读)集合。RDD 可以在大型集群上进行内存计算,以容错的方式加速大数据处理。为了实现容错,RDD 使用由一组顶点和边组成的 DAG(有向无环图)。DAG 中的顶点和边分别代表 RDD 和要应用于该 RDD 的操作。RDD 上定义的转换是惰性的,仅在调用操作时执行

Spark DataFrame

Spark 1.3 引入了两个新的数据抽象 API——DataFrame 和 DataSet。DataFrame API 将数据组织成命名列,就像关系数据库中的表一样。它使程序员能够在分布式数据集合上定义模式。DataFrame 中的每一行都是对象类型的行。与 SQL 表一样,DataFrame 中的每一列必须具有相同的行数。简而言之,DataFrame 是惰性评估计划,它指定需要对分布式数据集合执行的操作。DataFrame 也是一个不可变的集合。

Spark DataSet

作为 DataFrame API 的扩展,Spark 1.3 还引入了 DataSet API,它在 Spark 中提供了严格类型化和面向对象的编程接口。它是不可变的、类型安全的分布式数据集合。与 DataFrame 一样,DataSet API 也使用 Catalyst 引擎来启用执行优化。DataSet 是 DataFrame API 的扩展。

Other Differences-

在此处输入图像描述

于 2019-07-02T18:37:51.733 回答
1

一个。RDD (Spark1.0) —> 数据框(Spark1.3)-> 数据集(Spark1.6)

湾。RDD 让我们决定如何做,这限制了 Spark 可以对底层处理进行的优化。dataframe/dataset 让我们决定我们想要做什么,并将一切都留在 Spark 上来决定如何进行计算。

C。RDD 作为内存中的 jvm 对象,RDD 涉及垃圾收集和 Java(或更好的 Kryo)序列化的开销,这些开销在数据增长时会很昂贵。那就是降低性能。

数据框架提供了比 RDD 更大的性能改进,因为它具有 2 个强大的功能:

  1. 自定义内存管理(又名 Project Tungsten)
  2. 优化的执行计划(又名催化剂优化器)
    性能明智的 RDD -> 数据框 -> 数据集

d。数据集(Project Tungsten 和 Catalyst Optimizer)如何在数据帧上得分是它具有的附加功能:编码器

于 2021-04-06T07:00:58.797 回答
0

DataFrame是具有模式的 RDD 您可以将其视为关系数据库表,因为每一列都有一个名称和一个已知类型。DataFrames的强大之处在于,当您从结构化数据集(Json、Parquet..)创建 DataFrame 时,Spark 能够通过遍历整个(Json、Parquet..)数据集来推断模式。正在加载。然后,在计算执行计划时,Spark 可以使用模式并进行更好的计算优化。请注意,DataFrame在 Spark v1.3.0 之前称为 SchemaRDD

于 2018-01-07T22:35:19.753 回答
0

Apache Spark – RDD、DataFrame 和 DataSet

火花 RDD

RDD 代表弹性分布式数据集。它是只读分区的记录集合。RDD 是 Spark 的基础数据结构。它允许程序员以容错方式在大型集群上执行内存计算。因此,加快任务。

火花数据框-

与 RDD 不同,数据组织成命名列。例如关系数据库中的表。它是一个不可变的分布式数据集合。Spark 中的 DataFrame 允许开发人员将结构强加到分布式数据集合上,从而实现更高级别的抽象。

火花数据集-

Apache Spark 中的数据集是 DataFrame API 的扩展,它提供类型安全、面向对象的编程接口。Dataset 通过将表达式和数据字段暴露给查询计划器来利用 Spark 的 Catalyst 优化器。

于 2019-12-11T17:54:15.450 回答