任务数量 = 存储桶数量可能是 Spark 中存储桶中最重要且讨论不足的方面。存储桶(默认情况下)在历史上仅用于创建可以优化大型连接的“预混洗”数据帧。当您读取分桶表时,每个存储桶的所有文件或文件都由单个 spark 执行程序读取(读取数据时 30 个桶 = 30 个 spark 任务),这将允许该表连接到另一个存储在同一桶上的表 #的列。我发现这种行为很烦人,就像上面提到的用户可能会增长的表一样。
您现在可能会问自己,我为什么以及何时想要存储桶,以及我的真实数据何时会随着时间的推移以完全相同的方式增长?(老实说,您可能按日期对大数据进行分区)根据我的经验,您可能没有很好的用例来以默认的 spark 方式存储表。但是所有的东西都不会因为铲斗而丢失!
输入“桶修剪”。Bucket pruning 仅在您存储 ONE 列时有效,但自 SparkSQL 和 Dataframes 出现以来,它可能是您在 Spark 中最好的朋友。它允许 Spark 根据查询中的某些过滤器来确定表中的哪些文件包含特定值,这可以大大减少 spark 物理读取的文件数量,从而实现非常高效和快速的查询。(我将 2 小时以上的查询减少到 2 分钟和 1/100 的 Spark 工作人员)。但是您可能并不在意,因为桶数到任务的问题意味着如果每个桶、每个分区的文件太多,您的表将永远不会“扩展”。
进入 Spark 3.2.0。当您禁用基于存储桶的读取时,将允许存储桶修剪保持活动状态,从而允许您通过存储桶修剪/扫描分配火花读取。我还有一个技巧可以用 spark < 3.2 来做这件事,如下所示。(请注意,在 s3 上使用 vanilla spark.read 对文件进行叶子扫描会增加开销,但如果您的表很大,那没关系,因为您的存储桶优化表将是跨所有可用 spark 工作人员的分布式读取,现在将是可扩展)
val table = "ex_db.ex_tbl"
val target_partition = "2021-01-01"
val bucket_target = "valuex"
val bucket_col = "bucket_col"
val partition_col = "date"
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{FileScanRDD,FilePartition}
val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target)))
val sparkplan = df.queryExecution.executedPlan
val scan = sparkplan.collectFirst { case exec: FileSourceScanExec => exec }.get
val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
val bucket_files = for
{ FilePartition(bucketId, files) <- rdd.filePartitions f <- files }
yield s"$f".replaceAll("path: ", "").split(",")(0)
val format = bucket_files(0).split("
.").last
val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target))