Spark需要先在驱动中加载分区元数据,才能知道分区是否存在。Spark 将查询目录以查找现有分区,以了解在扫描数据期间是否可以修剪分区。
我已经在 Spark 2.0 上对此进行了测试,您可以在日志消息中看到。
16/10/14 17:23:37 TRACE ListingFileCatalog: Listing s3a://mybucket/reddit_year on driver
16/10/14 17:23:37 TRACE ListingFileCatalog: Listing s3a://mybucket/reddit_year/year=2007 on driver
这并不意味着我们正在扫描每个分区中的文件,但 Spark 将存储分区的位置以供将来在表上查询。
您可以看到它实际传递给分区过滤器以修剪数据的日志:
16/10/14 17:23:48 TRACE ListingFileCatalog: Partition spec: PartitionSpec(StructType(StructField(year,IntegerType,true)),ArrayBuffer(PartitionDirectory([2012],s3a://mybucket/reddit_year/year=2012), PartitionDirectory([2010],s3a://mybucket/reddit_year/year=2010), ...PartitionDirectory([2015],s3a://mybucket/reddit_year/year=2015), PartitionDirectory([2011],s3a://mybucket/reddit_year/year=2011)))
16/10/14 17:23:48 INFO ListingFileCatalog: Selected 1 partitions out of 9, pruned 88.88888888888889% partitions.
explain(True)
如果您对查询
运行 an,您可以在逻辑计划中看到这一点:spark.sql("select created_utc, score, name from reddit where year = '2014'").explain(True)
这将向您显示计划,您可以看到它正在计划的底部进行过滤:
+- *BatchedScan parquet [created_utc#58,name#65,score#69L,year#74] Format: ParquetFormat, InputPaths: s3a://mybucket/reddit_year, PartitionFilters: [isnotnull(year#74), (cast(year#74 as double) = 2014.0)], PushedFilters: [], ReadSchema: struct<created_utc:string,name:string,score:bigint>