使用 Apache Toree 可以在 Spark 上执行任意表达式。假设我们要执行一些 SQL 查询,例如:sqlContext.sql(..)
这样的 SQL 查询是否有可能取得进展(比如在 Zeppelin 中)?也许 Toree 可以提供一些查询指标(如X tasks from N are done
)?
使用 Apache Toree 可以在 Spark 上执行任意表达式。假设我们要执行一些 SQL 查询,例如:sqlContext.sql(..)
这样的 SQL 查询是否有可能取得进展(比如在 Zeppelin 中)?也许 Toree 可以提供一些查询指标(如X tasks from N are done
)?
Apache Zeppelin 的使用方式是通过 sc.dagScheduler。
如果不能直接访问 SparkContext,REST API应该是更好的选择。
package org.apache.zeppelin.spark
class SparkInterpreter {
@Override
public int getProgress(InterpreterContext context) {
String jobGroup = getJobGroup(context);
int completedTasks = 0;
int totalTasks = 0;
DAGScheduler scheduler = sc.dagScheduler();
if (scheduler == null) {
return 0;
}
HashSet<ActiveJob> jobs = scheduler.activeJobs();
if (jobs == null || jobs.size() == 0) {
return 0;
}
Iterator<ActiveJob> it = jobs.iterator();
while (it.hasNext()) {
ActiveJob job = it.next();
String g = (String) job.properties().get("spark.jobGroup.id");
if (jobGroup.equals(g)) {
int[] progressInfo = null;
try {
Object finalStage = job.getClass().getMethod("finalStage").invoke(job);
if (sparkVersion.getProgress1_0()) {
progressInfo = getProgressFromStage_1_0x(sparkListener, finalStage);
} else {
progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
}
} catch (IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException
| SecurityException e) {
logger.error("Can't get progress info", e);
return 0;
}
totalTasks += progressInfo[0];
completedTasks += progressInfo[1];
}
}
if (totalTasks == 0) {
return 0;
}
return completedTasks * 100 / totalTasks;
}
}