我在 EMR(使用 YARN)上使用 Spark SQL v2.4.7。我编写 Spark Sql 查询来执行转换。
估计复杂查询的最佳随机分区数:
我正在尝试估计需要设置的最佳随机分区数,以便为具有多个连接的复杂查询获得最佳性能。在网上我发现分区的最佳大小应该在10 MB
-的范围内100 MB
。现在,既然我知道了这个值,我下一步就是计算查询的数据 shuffle 量(以 MB 为单位),然后除以100
得到 shuffle 分区号。但是,对于涉及多个大表连接的复杂查询,估计 shuffle 量变得极其困难。
那么我如何估计shuffle volume的数量,从而得出最佳的大查询所需的随机分区数?目前(经过大量搜索)我正在执行以下步骤 -
scala> spark.sql("""
| create temp view initial_tbl
| as
| with first_qry as
| (
| select a.id,
| a.fund_id,
| b.fname,
| c.state_nm,
| row_number() over (partition by a.id order by c.eff_dt desc) as rownum
| from tblA a
| left join tblB b
| on a.id = b.id
| left join tblC c
| on b.st_id = c.st_id
| )
| select * from first_qry
| where rownum = 1
| """)
scala> spark.sql("""
| create temp view final_tbl as
| select a.id, a.fname, a.state_nm, b.salary, c.age
| from initial_tbl a
| left join fin_dtls b
| on a.id = b.id
| and a.fund_id = b.fund_id
| left join age_dtls c
| on b.age_id = c.age_id
| union all
| select id, fname, 'NA' as state_nm, salary, age
| from another_pre_created_tbl
| """)
scala> spark.sql("""
| select * from final_tbl
| limit 50
| """)
注意:这只是实际查询的简化版本。
好的,现在,我正在尝试估计此查询的数据大小,然后我可以将其除以100 MB
得到查询的最佳随机分区数。
scala> val df = spark.read.table("final_tbl")
scala> println(df.queryExecution.analyzed.stats)
Statistics(sizeInBytes=34.5 GB, hints=none)
所以上述查询的大小是34.5 GB
当除以100 MB
给出~ 350
随机分区时。现在设置 config 后SET spark.sql.shuffle.partitions=350
,我仍然看到查询很慢。所以我的问题是——
- 我这样做对吗?否则,请让我知道如何计算复杂查询(涉及多个连接)的随机播放量,并最终能够计算任何给定复杂查询的最佳随机播放分区数。
SKEW:
对于上面提到的查询,我看到12
作业是在 Spark UI 中触发的。在 UI 中,最后一个作业显示高倾斜,即一个任务是一个长条,而其他同时执行的任务由几个非常小的条表示(我希望我可以提供 UI 屏幕截图)-所以,我的问题(基于上述) 是-
- 如何确定上述查询的哪一部分,或者具体来说,这个大型复杂查询中的哪个表/列是导致倾斜的主要原因?在一个大查询中有如此多的表和联接,通过一次联接 2 个表并检查 UI 和进度来进行测试变得非常困难和耗时。
那么有没有什么聪明的方法来找出导致倾斜的实际连接表/列? - 此外,在确定导致倾斜的表/列之后,我该如何解决这个问题,以便所有分区都有相同的数据量来处理,从而加快工作速度?
- 如何将 UI 中的特定作业(触发的火花)与查询的哪个特定部分相关联?
写入输出时倾斜:
最后,我将上述查询的输出写入S3
SQL API ( %sql%
) 为 -
create table final_out
using parquet
options (
path 's3:/my/test/location/',
mode: 'overwrite'
)
as
select * from final_tbl
distribute by id;
即使是这样,当我检查 UI 时,我也发现了像上面这样的巨大偏差,一个任务是一个非常长的条,而其他同时执行的任务是非常小的条。如果您仔细观察,您会发现上面显示的最终查询union all
与另一个具有硬编码值(即'NA' as state_nm
)的查询进行了匹配。现在,由于-ed 表中有周围的100 Million
记录,因此该值成为输出中列的主要值,从而产生偏斜,从而使写入非常慢。union
'NA'
state_nm
所以我的最后一个问题是——
- 在作为 parquet 文件(使用 sql API)写入磁盘时,如何减轻由输出中的硬编码值引起的这种偏差?请注意,我已尝试在其
repartition
PK 列(id
distribute by id
create table
我的集群配置如下:
Nodes: 20
Cores: 8
Memory: 64 GB
对于这篇长篇文章,我感到非常抱歉,但是这些问题困扰了我很长时间。我在互联网上搜索了很多,但找不到任何具体的答案。谁能帮我解决这些问题。任何帮助表示赞赏。
谢谢。