0

是否为 apache spark 中的缓存 TempTables 启用了分区修剪?如果是这样,我该如何配置它?

我的数据是不同安装中的一堆传感器读数,一行包含安装名称、标签、时间戳和值。

我使用以下命令以镶木地板格式写入数据:

rdd.toDF("installationName", "tag", "timestamp", "value")
  .repartition($"installationName", $"tag")
  .write.partitionBy("installationName","tag").mode("overwrite").parquet(config.output)

我使用以下命令将这些数据读取到使用 Spark HiveContext 的 SQL 表中:

val parquet = hc.read.parquet("/path_to_table/tablename")
parquet.registerTempTable("tablename")

现在,如果我在该表上运行 SQL 查询,它会按预期进行分区修剪:

hc.sql("select * from tablename where installationName = 'XXX' and tag = 'YYY'")

查询大约需要 8 秒。但是如果我将表缓存在内存中,然后执行相同的查询,它总是需要大约 50 秒:

hc.sql("CACHE TABLE tablename")
hc.sql("select * from tablename where installationName = 'XXX' and tag = 'YYY'")

我目前正在使用 Spark 1.6.1。

4

1 回答 1

0

发生这种情况的原因是由于 Cache 在 spark 中的工作方式。

当您向 DataFrame、RDD 或 DataSet 调用某种进程时,执行有一个计划,如下所示:

val df = sc.parallelize(1 to 10000).toDF("line")
df.withColumn("new_line", col("line") * 10).queryExecution

该命令queryExecution将计划返回给您。请参阅下面的代码逻辑计划:

== Parsed Logical Plan ==
Project [*,('line * 10) AS new_line#7]
+- Project [_1#4 AS line#5]
   +- LogicalRDD [_1#4], MapPartitionsRDD[9] at 

== Analyzed Logical Plan ==
line: int, new_line: int
Project [line#5,(line#5 * 10) AS new_line#7]
+- Project [_1#4 AS line#5]
   +- LogicalRDD [_1#4], MapPartitionsRDD[9] at 

== Optimized Logical Plan ==
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7]
+- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at 

== Physical Plan ==
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7]
+- Scan ExistingRDD[_1#4]

在这种情况下,您可以看到您的代码将执行的所有过程。当你调用这样的cache函数时:

 df.withColumn("new_line", col("line") * 10).cache().queryExecution

结果将是这样的:

== Parsed Logical Plan ==
'Project [*,('line * 10) AS new_line#8]
+- Project [_1#4 AS line#5]
   +- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34

== Analyzed Logical Plan ==
line: int, new_line: int
Project [line#5,(line#5 * 10) AS new_line#8]
+- Project [_1#4 AS line#5]
   +- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34

== Optimized Logical Plan ==
InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#8], None

== Physical Plan ==
InMemoryColumnarTableScan [line#5,new_line#8], InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Pro...

此执行将返回给您执行优化的InMemoryRelation逻辑计划,这将在您的内存中保存数据结构,或者如果您的数据非常大,它将溢出到磁盘。

将其保存在集群中需要时间,第一次执行时会有点慢,但是当您需要再次访问其他地方的相同数据时,将保存 DF 或 RDD 并且 Spark 不会请求再次执行。

于 2017-02-10T15:37:46.433 回答