5

action据我了解, Spark中每个人都有一份工作。
但我经常看到一个操作触发了多个作业。我试图通过对数据集进行简单的聚合来测试这一点,以获得每个类别的最大值(这里是“主题”字段)

在检查 Spark UI 时,我可以看到为该操作执行了 3 个“作业” groupBy,而我期望只有一个。
谁能帮我理解为什么有 3 而不是只有 1?

   students.show(5)

    +----------+--------------+----------+----+-------+-----+-----+
    |student_id|exam_center_id|   subject|year|quarter|score|grade|
    +----------+--------------+----------+----+-------+-----+-----+
    |         1|             1|      Math|2005|      1|   41|    D|
    |         1|             1|   Spanish|2005|      1|   51|    C|
    |         1|             1|    German|2005|      1|   39|    D|
    |         1|             1|   Physics|2005|      1|   35|    D|
    |         1|             1|   Biology|2005|      1|   53|    C|
    |         1|             1|Philosophy|2005|      1|   73|    B|
    

  // Task : Find Highest Score in each subject
  val highestScores = students.groupBy("subject").max("score")
  highestScores.show(10)

+----------+----------+
|   subject|max(score)|
+----------+----------+
|   Spanish|        98|
|Modern Art|        98|
|    French|        98|
|   Physics|        98|
| Geography|        98|
|   History|        98|
|   English|        98|
|  Classics|        98|
|      Math|        98|
|Philosophy|        98|
+----------+----------+
only showing top 10 rows

在检查 Spark UI 时,我可以看到为该操作执行了 3 个“作业” groupBy,而我期望只有一个。 在此处输入图像描述

在此处输入图像描述 谁能帮我理解为什么有 3 而不是只有 1?

== Physical Plan ==
*(2) HashAggregate(keys=[subject#12], functions=[max(score#15)])
+- Exchange hashpartitioning(subject#12, 1)
   +- *(1) HashAggregate(keys=[subject#12], functions=[partial_max(score#15)])
      +- *(1) FileScan csv [subject#12,score#15] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/lab/SparkLab/files/exams/students.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<subject:string,score:int>
4

2 回答 2

2

我认为只有#3 完成了实际的“工作”(执行一个计划,如果您在 SQL 选项卡上打开查询的详细信息,您将看到该计划)。另外两个是准备步骤——

  • #1 正在查询 NameNode 以构建InMemoryFileIndex以读取您的 csv,并且
  • #2 是对数据集进行采样以执行.groupBy("subject").max("score"),这在内部需要一个sortByKey这里有更多详细信息)。
于 2020-06-27T17:03:38.047 回答
0

我建议检查物理计划-

highestScores.explain()

你可能会看到类似的东西——

*(2) HashAggregate(keys=[subject#9], functions=[max(score#12)], output=[subject#9, max(score)#51])
+- Exchange hashpartitioning(subject#9, 2)
   +- *(1) HashAggregate(keys=[subject#9], functions=[partial_max(score#12)], output=[subject#9, max#61])
  1. [Map stage] stage#1 是实现局部聚合(部分聚合),然后使用hashpartitioning(subject). 注意 hashpartitioner 使用group by
  2. 【Reduce stage】stage#2是将stage#1的输出合并为finalmax(score)
  3. 这实际上是用来打印前 10 条记录的show(10)
于 2020-06-28T03:49:29.753 回答