我有一个使用 DataFrame 进行计算的代码。
+------------------------------------+------------+----------+----+------+
| Name| Role|Experience|Born|Salary|
+------------------------------------+------------+----------+----+------+
| 瓮䇮滴ୗ┦附䬌┊ᇕ鈃디蠾综䛿ꩁ翨찘... | охранник| 16|1960|108111|
| 擲鱫뫉ܞ琱폤縭ᘵ훧귚۔♧䋐滜컑... | повар| 14|1977| 40934|
| 㑶뇨ꄳ壚ᗜ㙣샾ꎓ㌸翧쉟梒靻駌푤... | геодезист| 29|1997| 27335|
| ࣆ᠘䬆䨎⑁烸ᯠણ ᭯몇믊ຮ쭧닕㟣紕... | не охранн. | 4|1999 | 30000|
... ... ...
我试图以不同的方式缓存表格。
def processDataFrame(mode: String): Long = {
val t0 = System.currentTimeMillis
val topDf = df.filter(col("Salary").>(50000))
val cacheDf = mode match {
case "CACHE" => topDf.cache()
case "PERSIST" => topDf.persist()
case "CHECKPOINT" => topDf.checkpoint()
case "CHECKPOINT_NON_EAGER" => topDf.checkpoint(false)
case _ => topDf
}
val roleList = cacheDf.groupBy("Role")
.count()
.orderBy("Role")
.collect()
val bornList = cacheDf.groupBy("Born")
.count()
.orderBy(col("Born").desc)
.collect()
val t1 = System.currentTimeMillis()
t1-t0 // time result
}
我得到了让我思考的结果。
为什么 checkpoint(false) 比 persist() 更有效?毕竟,检查点需要时间来序列化对象并将它们写入磁盘。
PS 我在 GitHub 上的小项目:https ://github.com/MinorityMeaning/CacheCheckpoint