基于 Vikrant 的回答,这是一种更通用的方法,可以直接从表元数据中提取分区列值,从而避免 Spark 扫描表中的所有文件。
首先,如果您的数据尚未在目录中注册,您需要这样做,以便 Spark 可以查看分区详细信息。在这里,我正在注册一个名为data
.
spark.catalog.createTable(
'data',
path='/path/to/the/data',
source='parquet',
)
spark.catalog.recoverPartitions('data')
partitions = spark.sql('show partitions data')
但是,为了显示一个独立的答案,我将手动创建partitions
DataFrame,以便您可以看到它的外观,以及从中提取特定列值的解决方案。
from pyspark.sql.functions import (
col,
regexp_extract,
)
partitions = (
spark.createDataFrame(
[
('/country=usa/region=ri/',),
('/country=usa/region=ma/',),
('/country=russia/region=siberia/',),
],
schema=['partition'],
)
)
partition_name = 'country'
(
partitions
.select(
'partition',
regexp_extract(
col('partition'),
pattern=r'(\/|^){}=(\S+?)(\/|$)'.format(partition_name),
idx=2,
).alias(partition_name),
)
.show(truncate=False)
)
此查询的输出是:
+-------------------------------+-------+
|partition |country|
+-------------------------------+-------+
|/country=usa/region=ri/ |usa |
|/country=usa/region=ma/ |usa |
|/country=russia/region=siberia/|russia |
+-------------------------------+-------+
Scala 中的解决方案看起来与此非常相似,只是调用regexp_extract()
看起来略有不同:
.select(
regexp_extract(
col("partition"),
exp=s"(\\/|^)${partitionName}=(\\S+?)(\\/|$$)",
groupIdx=2
).alias(partitionName).as[String]
)
同样,以这种方式查询分区值的好处是 Spark 不会扫描表中的所有文件来为您提供答案。如果您的表格中包含数万或数十万个文件,那么您的时间节省将非常可观。