发生这种情况的原因是由于 Cache 在 spark 中的工作方式。
当您向 DataFrame、RDD 或 DataSet 调用某种进程时,执行有一个计划,如下所示:
val df = sc.parallelize(1 to 10000).toDF("line")
df.withColumn("new_line", col("line") * 10).queryExecution
该命令queryExecution
将计划返回给您。请参阅下面的代码逻辑计划:
== Parsed Logical Plan ==
Project [*,('line * 10) AS new_line#7]
+- Project [_1#4 AS line#5]
+- LogicalRDD [_1#4], MapPartitionsRDD[9] at
== Analyzed Logical Plan ==
line: int, new_line: int
Project [line#5,(line#5 * 10) AS new_line#7]
+- Project [_1#4 AS line#5]
+- LogicalRDD [_1#4], MapPartitionsRDD[9] at
== Optimized Logical Plan ==
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7]
+- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at
== Physical Plan ==
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7]
+- Scan ExistingRDD[_1#4]
在这种情况下,您可以看到您的代码将执行的所有过程。当你调用这样的cache
函数时:
df.withColumn("new_line", col("line") * 10).cache().queryExecution
结果将是这样的:
== Parsed Logical Plan ==
'Project [*,('line * 10) AS new_line#8]
+- Project [_1#4 AS line#5]
+- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34
== Analyzed Logical Plan ==
line: int, new_line: int
Project [line#5,(line#5 * 10) AS new_line#8]
+- Project [_1#4 AS line#5]
+- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34
== Optimized Logical Plan ==
InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#8], None
== Physical Plan ==
InMemoryColumnarTableScan [line#5,new_line#8], InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Pro...
此执行将返回给您执行优化的InMemoryRelation
逻辑计划,这将在您的内存中保存数据结构,或者如果您的数据非常大,它将溢出到磁盘。
将其保存在集群中需要时间,第一次执行时会有点慢,但是当您需要再次访问其他地方的相同数据时,将保存 DF 或 RDD 并且 Spark 不会请求再次执行。