我有以下代码hiveContext.sql()
大部分时间都会触发。我的任务是我想创建几个表并在处理完所有配置单元表分区后将值插入。
所以我首先触发show partitions
并在 for 循环中使用它的输出,我调用了一些方法来创建表(如果它不存在)并使用hiveContext.sql
.
现在,我们不能在执行器hiveContext
中执行,所以我必须在驱动程序的 for 循环中执行它,并且应该一个接一个地串行运行。当我在 YARN 集群中提交此 Spark 作业时,几乎所有时间我的执行程序都因为未找到 shuffle 异常而丢失。
现在发生这种情况是因为 YARN 由于内存过载而杀死了我的执行程序。我不明白为什么,因为我为每个配置单元分区都有一个非常小的数据集,但它仍然会导致 YARN 杀死我的执行程序。
以下代码会并行执行所有操作并尝试同时容纳内存中的所有配置单元分区数据吗?
public static void main(String[] args) throws IOException {
SparkConf conf = new SparkConf();
SparkContext sc = new SparkContext(conf);
HiveContext hc = new HiveContext(sc);
DataFrame partitionFrame = hiveContext.sql(" show partitions dbdata partition(date="2015-08-05")");
Row[] rowArr = partitionFrame.collect();
for(Row row : rowArr) {
String[] splitArr = row.getString(0).split("/");
String server = splitArr[0].split("=")[1];
String date = splitArr[1].split("=")[1];
String csvPath = "hdfs:///user/db/ext/"+server+".csv";
if(fs.exists(new Path(csvPath))) {
hiveContext.sql("ADD FILE " + csvPath);
}
createInsertIntoTableABC(hc,entity, date);
createInsertIntoTableDEF(hc,entity, date);
createInsertIntoTableGHI(hc,entity,date);
createInsertIntoTableJKL(hc,entity, date);
createInsertIntoTableMNO(hc,entity,date);
}
}