我正在尝试测试下面的程序以获取检查点并从检查点位置读取如果应用程序由于资源不可用等任何原因而失败。当我终止作业并再次重新触发它时,执行将从头开始。不知道实现这一目标还需要什么。谢谢 !!
下面是代码:
import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object withCheckpoint {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
//val conf = new SparkConf().setAppName("Without Checkpoint")
val conf = new SparkConf().setAppName("With Checkpoint")
val sc = new SparkContext(conf)
val checkpointDirectory = "/tmp"
sc.setCheckpointDir(checkpointDirectory) // set checkpoint directory
val spark = SparkSession.builder.appName("Without Checkpoint").getOrCreate()
/************************************************************************************************************************************************/
/* Reading source data begins here */
/************************************************************************************************************************************************/
val readCtryDemoFile = spark.read.option("header", "true").csv("/tmp/Ctry_Demo.csv")
val readCtryRefFile = spark.read.option("header","true").csv("/tmp/ref_ctry.csv")
val readCtryCntntFile = spark.read.option("header","true").csv("/tmp/ctry_to_continent.csv")
/************************************************************************************************************************************************/
/* Reading source data Completes */
/************************************************************************************************************************************************/
/************************************************************************************************************************************************/
/* Transformation begins here */
/************************************************************************************************************************************************/
/*********************************************************************************/
/* Join above created dataframes to pull respective columns */
/*********************************************************************************/
val jnCtryDemoCtryref = readCtryDemoFile.join(readCtryRefFile,Seq("NUM_CTRY_CD"))
val jnCtryCntnt = jnCtryDemoCtryref.join(readCtryCntntFile,Seq("Alpha_2_CTRY_CD"))
/*********************************************************************************/
/* Checkpointing the above created Dataframe to the checkpoint Directory */
/*********************************************************************************/
val jnCtryCntntchkpt = jnCtryCntnt.checkpoint()
jnCtryCntntchkpt.collect()
/*********************************************************************************/
/* Creating multiple outputs based on different aggregation keys */
/*********************************************************************************/
val aggCntnNm = jnCtryCntntchkpt.groupBy("CONTINENT_NM").agg(sum("POPULATION").as("SUM_POPULATION")).orderBy("CONTINENT_NM")
aggCntnNm.show()
val aggCtryNm = jnCtryCntntchkpt.groupBy("Ctry_NM").agg(sum("POPULATION").as("SUM_POPULATION")).orderBy("Ctry_NM")
aggCtryNm.show()
val aggCtryCd = jnCtryCntntchkpt.groupBy("NUM_CTRY_CD").agg(sum("POPULATION").as("SUM_POPULATION")).orderBy("NUM_CTRY_CD")
aggCtryCd.show()
/************************************************************************************************************************************************/
/* Transformation begins here */
/************************************************************************************************************************************************/
}
}