1

我正在学习 Spark 的 CodeGen 机制,但对 Spark 将 RDD 的转换/操作转换为逻辑计划的方式感到困惑。Spark 应用程序如下:

 def sparkTest(): Any = {

    val spark = SparkInit.spark

    import spark.implicits._

    val data = Seq(1, 2, 3, 4, 5, 6, 7, 8)

    // closure start

    val const = 3

    def mulBy(factor: Double) = (x: Double) => factor * x

    val mulByval = mulBy(const)

    // closure end

    val testRDD = data.toDS()
    val filterRDD = testRDD.filter(i =>
      mulByval(i) <= 7
    )

    filterRDD.collect()

    filterRDD.foreach(i =>
      println(i)
    )


  }

我试图跟踪源代码,但发现当代码转到 Dataset.collect 时,已经生成了 queryExecution。

 def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)

查询执行如下

== Parsed Logical Plan ==
'TypedFilter <function1>, int, [StructField(value,IntegerType,false)], unresolveddeserializer(upcast(getcolumnbyordinal(0, IntegerType), IntegerType, - root class: "scala.Int"))
+- LocalRelation [value#46]

== Analyzed Logical Plan ==
value: int
TypedFilter <function1>, int, [StructField(value,IntegerType,false)], cast(value#46 as int)
+- LocalRelation [value#46]

== Optimized Logical Plan ==
TypedFilter <function1>, int, [StructField(value,IntegerType,false)], value#46: int
+- LocalRelation [value#46]

== Physical Plan ==
*Filter <function1>.apply$mcZI$sp
+- LocalTableScan [value#46]

但我找不到逻辑计划生成的时间和地点。有什么我错过的吗?

4

1 回答 1

2

这里有一些混淆。RDD api 实际上并不生成计划。它们作为数据集用来工作的原始或旧 api 存在。在您的特定示例中,当您编写此行时,查询计划开始构建(尽管它是惰性的)。

val testRDD = data.toDS()

在此之后你不再有一个 RDD,你有一个数据集,它是从“数据”的 linage 的结果编码的。您可以通过调用 explain 方法来查看任何数据集的计划以获取更多详细信息。

所以总结一下

  • 每个 DataSet 在初始化时都有一个queryExecution设计。
  • RDD 没有逻辑计划(或任何代码生成)
  • RDDS 变成 DataSets 有一个计划,其第一步是运行 RDD 的依赖树。

更详细地查看您的代码,实际上从未使用过 RDD。您从 Collection 开始,然后直接转到创建 LocalTableScan 的 Dataset,它基本上只是将值转换为 InternalRowRepresentation 并将它们并行化。有关详细信息,请参阅LocalTableScanExec

于 2017-10-26T15:49:25.530 回答