我正在学习 Spark 的 CodeGen 机制,但对 Spark 将 RDD 的转换/操作转换为逻辑计划的方式感到困惑。Spark 应用程序如下:
def sparkTest(): Any = {
val spark = SparkInit.spark
import spark.implicits._
val data = Seq(1, 2, 3, 4, 5, 6, 7, 8)
// closure start
val const = 3
def mulBy(factor: Double) = (x: Double) => factor * x
val mulByval = mulBy(const)
// closure end
val testRDD = data.toDS()
val filterRDD = testRDD.filter(i =>
mulByval(i) <= 7
)
filterRDD.collect()
filterRDD.foreach(i =>
println(i)
)
}
我试图跟踪源代码,但发现当代码转到 Dataset.collect 时,已经生成了 queryExecution。
def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)
查询执行如下
== Parsed Logical Plan ==
'TypedFilter <function1>, int, [StructField(value,IntegerType,false)], unresolveddeserializer(upcast(getcolumnbyordinal(0, IntegerType), IntegerType, - root class: "scala.Int"))
+- LocalRelation [value#46]
== Analyzed Logical Plan ==
value: int
TypedFilter <function1>, int, [StructField(value,IntegerType,false)], cast(value#46 as int)
+- LocalRelation [value#46]
== Optimized Logical Plan ==
TypedFilter <function1>, int, [StructField(value,IntegerType,false)], value#46: int
+- LocalRelation [value#46]
== Physical Plan ==
*Filter <function1>.apply$mcZI$sp
+- LocalTableScan [value#46]
但我找不到逻辑计划生成的时间和地点。有什么我错过的吗?