2

我想像这样通过 Spark 执行 SQL。

sparkSession.sql("select * from table")

但我想在执行避免全扫描之前对表进行分区检查。

如果表是分区表,我的程序会强制用户添加分区过滤器。如果没有,可以运行。

所以我的问题是如何知道一个表是否是一个分区表?我的想法是从 Metastore 中读取信息。但是如何获取 Metastore 是我遇到的另一个问题。有人可以帮忙吗?

4

3 回答 3

2

假设您的真正目标是限制无界查询的执行,我认为获取查询的执行计划并在其FileScan/HiveTableScan叶节点下查看是否正在应用任何分区过滤器会更容易。顺便说一下,对于分区表,查询实际要扫描的分区数也会显示出来。所以,这样的事情应该做:

scala> val df_unbound = spark.sql("select * from hottab")
df_unbound: org.apache.spark.sql.DataFrame = [id: int, descr: string ... 1 more field]

scala> val plan1 = df_unbound.queryExecution.executedPlan.toString
plan1: String =
"*(1) FileScan parquet default.hottab[id#0,descr#1,loaddate#2] Batched: true, Format: Parquet, 
Location: CatalogFileIndex[hdfs://ns1/user/hive/warehouse/hottab], 
PartitionCount: 365, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<id:int,descr:string>
"

scala> val df_filtered = spark.sql("select * from hottab where loaddate='2019-07-31'")
df_filtered: org.apache.spark.sql.DataFrame = [id: int, descr: string ... 1 more field]

scala> val plan2 = df_filtered.queryExecution.executedPlan.toString
plan2: String =
"*(1) FileScan parquet default.hottab[id#17,descr#18,loaddate#19] Batched: true, Format: Parquet, 
Location: PrunedInMemoryFileIndex[hdfs://ns1/user/hive/warehouse/hottab/loaddate=2019-07-31], 
PartitionCount: 1, PartitionFilters: [isnotnull(loaddate#19), (loaddate#19 = 2019-07-31)], 
PushedFilters: [], ReadSchema: struct<id:int,descr:string>
"

这样,您也不必处理 SQL 解析来从查询中查找表名,并自己询问元存储。

作为奖励,除了分区修剪之外,您还可以查看是否发生“常规”过滤器下推(对于支持它的存储格式)。

于 2019-07-31T15:37:47.400 回答
0

您可以使用 Scala 的Try类并show partitions在所需的表上执行。

val numPartitions = Try(spark.sql("show partitions database.table").count) match {
    case Success(v) => v
    case Failure(e) => -1
}

以后可以查numPartitions。如果值为,-1则表未分区。

于 2019-07-31T12:31:35.050 回答
0
  val listPartitions = spark.sessionState.catalog.listPartitionNames(TableIdentifier("table_name", Some("db name")))
  listPartitions: Seq[String] = ArrayBuffer(partition1=value1, ... )  // partition table
  listPartitions: Seq[String] = ArrayBuffer() // not partition table
于 2021-12-06T14:08:25.787 回答