让我们从从文件路径与元存储加载数据的情况开始。在这种情况下,Spark 将首先执行递归文件列表以发现嵌套的分区文件夹和其中的文件。然后将分区文件夹定义为用于分区修剪的字段。因此,在您过滤任何分区列的情况下,Spark 将仅选择满足该谓词的分区。您可以通过在查询中使用该explain
方法来确认。请注意以下PartitionCount: 1
:
scala> input1.where("city = 'Houston'").explain()
== Physical Plan ==
*(1) FileScan parquet [id#32,state#33,city#34] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/data], PartitionCount: 1, PartitionFilters: [isnotnull(city#34), (city#34 = Houston)], PushedFilters: [], ReadSchema: struct<id:int>
将其与没有任何过滤器的查询计划进行比较,其中PartitionCount: 5
:
scala> input1.explain()
== Physical Plan ==
*(1) FileScan parquet [id#55,state#56,city#57] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/data], PartitionCount: 5, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
现在第二种情况是加载分区表时。在这种情况下,分区由 Hive 管理,因此可以节省昂贵的递归文件列表。当您过滤分区列时,Spark 将再次仅选择相关分区。请注意以下explain
计划:
scala> input2.where("city = 'Houston'").explain()
== Physical Plan ==
*(1) FileScan parquet default.data[id#39,state#40,city#41] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[file:/tmp/data/state=Texas/city=Houston], PartitionCount: 1, PartitionFilters: [isnotnull(city#41), (city#41 = Houston)], PushedFilters: [], ReadSchema: struct<id:int>