2

Spark 版本 - 2.2.1。

我创建了一个有 64 个桶的桶表,我正在执行一个聚合函数select t1.ifa,count(*) from $tblName t1 where t1.date_ = '2018-01-01' group by ifa。我可以看到 Spark UI 中有 64 个任务,它们仅使用 20 个执行程序(每个执行程序有 16 个内核)。有没有办法可以扩展任务数量,或者这就是分桶查询应该运行的方式(运行内核的数量作为桶的数量)?

这是创建表:

sql("""CREATE TABLE level_1 (
 bundle string,
  date_ date,
 hour SMALLINT)
 USING ORC
 PARTITIONED BY (date_ , hour )
 CLUSTERED BY (ifa)
 SORTED BY (ifa)
 INTO 64 BUCKETS
 LOCATION 'XXX'""")

这是查询:

sql(s"select t1.ifa,count(*) from $tblName t1 where t1.date_ = '2018-01-01' group by ifa").show
4

2 回答 2

2

使用分桶,任务数 == 桶数,因此您应该知道您需要/想要使用的核心/任务数,然后将其设置为桶数。

于 2018-01-22T10:25:27.820 回答
1

任务数量 = 存储桶数量可能是 Spark 中存储桶中最重要且讨论不足的方面。存储桶(默认情况下)在历史上仅用于创建可以优化大型连接的“预混洗”数据帧。当您读取分桶表时,每个存储桶的所有文件或文件都由单个 spark 执行程序读取(读取数据时 30 个桶 = 30 个 spark 任务),这将允许该表连接到另一个存储在同一桶上的表 #的列。我发现这种行为很烦人,就像上面提到的用户可能会增长的表一样。

您现在可能会问自己,我为什么以及何时想要存储桶,以及我的真实数据何时会随着时间的推移以完全相同的方式增长?(老实说,您可能按日期对大数据进行分区)根据我的经验,您可能没有很好的用例来以默认的 spark 方式存储表。但是所有的东西都不会因为铲斗而丢失!

输入“桶修剪”。Bucket pruning 仅在您存储 ONE 列时有效,但自 SparkSQL 和 Dataframes 出现以来,它可能是您在 Spark 中最好的朋友。它允许 Spark 根据查询中的某些过滤器来确定表中的哪些文件包含特定值,这可以大大减少 spark 物理读取的文件数量,从而实现非常高效和快速的查询。(我将 2 小时以上的查询减少到 2 分钟和 1/100 的 Spark 工作人员)。但是您可能并不在意,因为桶数到任务的问题意味着如果每个桶、每个分区的文件太多,您的表将永远不会“扩展”。

进入 Spark 3.2.0。当您禁用基于存储桶的读取时,将允许存储桶修剪保持活动状态,从而允许您通过存储桶修剪/扫描分配火花读取。我还有一个技巧可以用 spark < 3.2 来做这件事,如下所示。(请注意,在 s3 上使用 vanilla spark.read 对文件进行叶子扫描会增加开销,但如果您的表很大,那没关系,因为您的存储桶优化表将是跨所有可用 spark 工作人员的分布式读取,现在将是可扩展)

val table = "ex_db.ex_tbl"
val target_partition = "2021-01-01"
val bucket_target = "valuex"
val bucket_col = "bucket_col"
val partition_col = "date"

import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{FileScanRDD,FilePartition}


val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target)))
val sparkplan = df.queryExecution.executedPlan
val scan = sparkplan.collectFirst { case exec: FileSourceScanExec => exec }.get
val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
val bucket_files = for

{ FilePartition(bucketId, files) <- rdd.filePartitions f <- files }
yield s"$f".replaceAll("path: ", "").split(",")(0)
val format = bucket_files(0).split("
.").last
val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target))
于 2021-08-20T22:27:46.367 回答