我有下面的 synpase 管道,它有 4 个笔记本活动。我正在使用大型火花池。第二个笔记本抛出错误TaskCanceledException:任务被取消。谁能帮我这个?
笔记本代码:-
val format = new SimpleDateFormat("y-MM-dd") //Store current date
println(format.format(Calendar.getInstance().getTime()))
if (metadata_df.count() > 0){
for (row <- metadata_df.rdd.collect){
val a = row(0)
println(a)
val Input_Path = f"abfss://$container_name@$account_name.dfs.core.windows.net/$relative_path/"+ format.format(Calendar.getInstance().getTime()) + "/"+a+"/*"
var DF = spark.read.option("mergeSchema",true).format("parquet").load(Input_Path)
//display(DF)
//val column_list = DF.columns
//val DF1 = DF.select(column_list.map(name => func(col(name))): _*).drop("LastModifytime")
val DF1 = DF.drop("LastModifytime")
val Out_Path = f"abfss://$container_name@$account_name.dfs.core.windows.net/Staging/"+a+"/HotData/"
DF1.repartition(1).write.format("parquet").mode("overwrite").save(Out_Path);
}
}
管道快照:- 在此处输入图像描述
配置
%%configure session -f
{
// You can get a list of valid parameters to config the session from https://github.com/cloudera/livy#request-body.
"driverMemory": "112g", // Recommended values: ["28g", "56g", "112g", "224g", "400g", "472g"]
"driverCores": 16, // Recommended values: [4, 8, 16, 32, 64, 80]
"executorMemory": "112g",
"executorCores": 16,
//"jars": ["abfs[s]: //<file_system>@<account_name>.dfs.core.windows.net/<path>/myjar.jar", "wasb[s]: //<containername>@<accountname>.blob.core.windows.net/<path>/myjar1.jar"],
"conf":
{
// Example of standard spark property, to find more available properties please visit: https://spark.apache.org/docs/latest/configuration.html#application-properties.
"spark.driver.maxResultSize": "64g",
// Example of customized property, you can specify count of lines that Spark SQL returns by configuring "livy.rsc.sql.num-rows".
"livy.rsc.sql.num-rows": "300000",
"spark.executor.heartbeatInterval":10
}
}