我有 2 个 spark RDD、dataRDD 和 newPairDataRDD,它们用于 spark SQL 查询。当我的应用程序初始化时,dataRDD 将被初始化。一个指定的 hbase 实体中的所有数据都将存储到 dataRDD。
当客户端的 sql 查询到来时,我的 APP 将获得所有新的更新和插入到 newPairDataRDD。dataRDD 联合 newPairDataRDD 并在 spark SQL 上下文中注册为表。
我什至在 dataRDD 中发现了 0 条记录,在 newPairDataRDD 中发现了 1 条新插入记录。联合需要 4 秒。这太慢了
我认为这是不合理的。任何人都知道如何使它更快?感谢下面的简单代码
// Step1: load all data from hbase to dataRDD when initial, this only run once.
JavaPairRDD<String, Row> dataRDD= getAllBaseDataToJavaRDD();
dataRDD.cache();
dataRDD.persist(StorageLevel.MEMORY_ONLY());
logger.info(dataRDD.count());
// Step2: when spark sql query coming, load latest updated and inserted data from db to newPairDataRDD
JavaPairRDD<String, Row> newPairDataRDD = getUpdateOrInstertBaseDataToJavaRDD();
// Step3: if count>0 do union and reduce
if(newPairDataRDD.count() > 0) {
JavaPairRDD<String, Row> unionedRDD =dataRDD.union(newPairDataRDD);
// if data was updated in DB, need to delete the old version from the dataRDD.
dataRDD = unionedRDD.reduceByKey(
new Function2<Row, Row, Row>() {
// @Override
public Row call(Row r1, Row r2) {
return r2;
}
});
}
//step4: register the dataRDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(dataRDD..values(), schema);
//step5: execute sql query
retRDD = sqlContext.sql(sql);
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
从 spark web ui,我可以看到下面。显然它需要4s来联合
已完成的阶段 (8)
StageId 描述 提交的持续时间 任务:成功/总输入 Shuffle Read Shuffle Write
6 在 SparkPlan.scala 收集:85+详细信息 1/4/2015 8:17 2 s 8-Aug 156.0 B
SparkSqlQueryForMarsNew.java:389+details 的 7 联合 1/4/2015 8:17 4 s 8-Aug 64.0 B 156.0 B