7

我的代码算法如下
Step1。获取一个 hbase 实体数据到 hBaseRDD

      JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = 
                 jsc.newAPIHadoopRDD(hbase_conf,  TableInputFormat.class,
                 ImmutableBytesWritable.class, Result.class); 

步骤 2。将 hBaseRDD 转换为 rowPairRDD

     // in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data 
     JavaPairRDD<String, Row> rowPairRDD = hBaseRDD 
                            .mapToPair(***); 
    dataRDD.repartition(500);
        dataRDD.cache();

步骤 3。将 rowPairRDD 转换为 schemaRDD

            JavaSchemaRDD schemaRDD =   sqlContext.applySchema(rowPairRDD.values(), schema); 
            schemaRDD.registerTempTable("testentity"); 
           sqlContext.sqlContext().cacheTable("testentity");

第四步。使用 spark sql 做第一个简单的 sql 查询。

   JavaSQLContext  sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(jsc);
    JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE 
             column3 = 'value1' ") 
     List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

步骤 5。使用 spark sql 做第二个简单的 sql 查询。

JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity 
                                     WHERE column3 = 'value2' ") 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

第六步。使用 spark sql 做第三个简单的 sql 查询。

JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value3' "); 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

测试结果如下:

测试用例1

当我插入 300,000 条记录时,hbase 实体,然后运行代码。

  • 第一次查询需要 60407 毫秒
  • 第二次查询需要 838 毫秒
  • 3td 查询需要 792 毫秒

如果我使用 hbase Api 做类似的查询,只需要 2000 毫秒。显然最后 2 个 spark sql 查询比 hbase api 查询快得多。
我相信第一个 spark sql 查询会花费大量时间从 hbase 加载数据。
所以第一个查询比最后两个查询慢得多。我认为结果是预期的

测试用例2

当我插入 400,000 条记录时。hbase 实体,然后运行代码。

  • 第一次查询需要 87213 毫秒
  • 第二次查询需要 83238 毫秒
  • 3td 查询需要 82092 毫秒

如果我使用 hbase Api 做类似的查询,只需要 3500 毫秒。显然 3 spark sql 查询比 hbase api 查询慢得多。
而且最后2个spark sql查询也很慢,性能和第一个查询差不多,为什么?如何调整性能?

4

2 回答 2

3

我怀疑您尝试缓存的数据多于分配给 Spark 实例的数据。我将尝试分解每次执行完全相同的查询时发生的情况。

首先,Spark 中的一切都是惰性的。这意味着当您调用 时rdd.cache(),在您对 RDD 执行某些操作之前,实际上什么都不会发生。

第一次查询

  1. 全 HBase 扫描(慢)
  2. 增加分区数(导致 shuffle、慢)
  3. 数据实际上被缓存到内存中,因为 Spark 是惰性的(有点慢)
  4. 应用 where 谓词(快速)
  5. 收集结果

第二次/第三次查询

  1. 全内存扫描(快速)
  2. 应用 where 谓词(快速)
  3. 收集结果

现在,Spark 将尝试缓存尽可能多的 RDD。如果它不能缓存整个东西,你可能会遇到一些严重的减速。如果缓存之前的步骤之一导致随机播放,则尤其如此。您可能会在第一个查询中为每个后续查询重复步骤 1 - 3。这并不理想。

要查看您是否没有完全缓存 RDD,请转到您的 Spark Web UI(http://localhost:4040如果处于本地独立模式)并查找 RDD 存储/持久性信息。确保它是 100%。

编辑(根据评论):

我的 hbase 中 400,000 个数据大小只有大约 250MB。为什么我需要使用 2G 来解决问题(但 1G>>250MB)

我不能肯定地说为什么你会达到你的最大限制spark.executor.memory=1G,但我会添加一些关于缓存的更多相关信息。

  • Spark 仅将执行程序堆内存的一部分分配给缓存。默认情况下,这是spark.storage.memoryFraction=0.6或 60%。所以你真的只是得到1GB * 0.6
  • HBase 中使用的总空间可能与 Spark 中缓存时占用的总堆空间不同。默认情况下,Spark 在内存中存储时不会序列化 Java 对象。Object正因为如此,存储 Java元数据的开销很大。您可以更改默认的持久性级别

您知道如何缓存所有数据以避免第一次查询的性能不佳吗?

调用任何操作都会导致 RDD 被缓存。就这样做

scala> rdd.cache
scala> rdd.count

现在它被缓存了。

于 2014-12-26T23:55:31.707 回答
1

我希望您一次又一次地运行这些查询,如果是的话,您为什么要为每个查询创建单独的 sqlContext?你也可以尝试重新分区RDD,这将增加并行度。如果可能的话,缓存RDD。

希望以上步骤能提高性能。

于 2014-12-25T10:15:45.427 回答