1

我有一个使用 DataFrame 进行计算的代码。

+------------------------------------+------------+----------+----+------+
|                                Name|        Role|Experience|Born|Salary|
+------------------------------------+------------+----------+----+------+
|    瓮䇮滴ୗ┦附䬌┊ᇕ鈃디蠾综䛿ꩁ翨찘...   |    охранник|        16|1960|108111|
|     擲鱫뫉ܞ琱폤縭ᘵ௑훧귚۔᡺♧䋐滜컑...   |       повар|        14|1977| 40934|
| 㑶뇨⿟ꄳ壚ᗜ㙣޲샾ꎓ㌸翧쉟梒靻駌푤...        |   геодезист|        29|1997| 27335|
|       ࣆ᠘䬆䨎⑁烸ᯠણ ᭯몇믊ຮ쭧닕㟣紕...    | не охранн. |        4|1999 | 30000|
... ... ...

我试图以不同的方式缓存表格。

    def processDataFrame(mode: String): Long = {
    val t0 = System.currentTimeMillis
    val topDf = df.filter(col("Salary").>(50000))

    val cacheDf = mode match {
          case "CACHE" => topDf.cache()
          case "PERSIST" => topDf.persist()
          case "CHECKPOINT" => topDf.checkpoint()
          case "CHECKPOINT_NON_EAGER" => topDf.checkpoint(false)
          case _ => topDf
          }

    val roleList = cacheDf.groupBy("Role")
                          .count()
                          .orderBy("Role")
                          .collect()
    val bornList = cacheDf.groupBy("Born")
                          .count()
                          .orderBy(col("Born").desc)
                          .collect()

    val t1 = System.currentTimeMillis()

   
    t1-t0   // time result
  }

我得到了让我思考的结果。

在此处输入图像描述

为什么 checkpoint(false) 比 persist() 更有效?毕竟,检查点需要时间来序列化对象并将它们写入磁盘。

PS 我在 GitHub 上的小项目:https ://github.com/MinorityMeaning/CacheCheckpoint

4

1 回答 1

1

我还没有检查你的项目,但我认为值得进行一次小讨论。我希望您清楚地指出您没有运行此代码一次,而是对多次运行进行平均,以确定此特定数据集的性能。(不是效率)Spark 集群可能有很多噪音,会导致作业之间的差异,并且确实需要平均几次运行来确定性能。有几个性能因素(数据局部性/Spark 执行器、资源争用等)

我认为您不能说“高效”,因为这些功能实际上执行两种不同的功能。由于他们所做的事情,他们也会在不同的情况下表现出不同的表现。有时您需要检查点、截断数据沿袭或在计算量非常大的操作之后。有时,重新计算谱系实际上比从磁盘读写更便宜。

简单的规则是,如果您要多次使用此表/DataFrame/DataSet 将其缓存在内存中。(不是磁盘)

一旦您遇到未完成的工作的问题,请考虑可以调整的内容。从代码角度/查询角度。

之后...

当且仅当这与复杂作业的失败有关并且您看到执行程序失败时,才考虑使用磁盘来保存数据。这应该始终是故障排除的后续步骤,而不是故障排除的第一步。

于 2021-10-18T17:47:47.543 回答