0

我在 EMR(使用 YARN)上使用 Spark SQL v2.4.7。我编写 Spark Sql 查询来执行转换。

估计复杂查询的最佳随机分区数
我正在尝试估计需要设置的最佳随机分区数,以便为具有多个连接的复杂查询获得最佳性能。在网上我发现分区的最佳大小应该在10 MB-的范围内100 MB。现在,既然我知道了这个值,我下一步就是计算查询的数据 shuffle 量(以 MB 为单位),然后除以100得到 shuffle 分区号。但是,对于涉及多个大表连接的复杂查询,估计 shuffle 量变得极其困难。
那么我如何估计shuffle volume的数量,从而得出最佳的大查询所需的随机分区数?目前(经过大量搜索)我正在执行以下步骤 -

scala> spark.sql("""
       | create temp view initial_tbl
       | as
       | with first_qry as 
       | (
       | select a.id, 
       | a.fund_id,
       | b.fname, 
       | c.state_nm, 
       | row_number() over (partition by a.id order by c.eff_dt desc) as rownum
       | from tblA a
       | left join tblB b
       | on a.id = b.id
       | left join tblC c
       | on b.st_id = c.st_id
       | )
       | select * from first_qry
       | where rownum = 1
       | """)

scala> spark.sql("""
       | create temp view final_tbl as
       | select a.id, a.fname, a.state_nm, b.salary, c.age
       | from initial_tbl a
       | left join fin_dtls b
       | on a.id = b.id
       | and a.fund_id = b.fund_id
       | left join age_dtls c
       | on b.age_id = c.age_id
       | union all
       | select id, fname, 'NA' as state_nm, salary, age
       | from another_pre_created_tbl
       | """)
scala> spark.sql("""
       | select * from final_tbl 
       | limit 50
       | """)

注意:这只是实际查询的简化版本。

好的,现在,我正在尝试估计此查询的数据大小,然后我可以将其除以100 MB得到查询的最佳随机分区数。

scala> val df = spark.read.table("final_tbl")
scala> println(df.queryExecution.analyzed.stats)
Statistics(sizeInBytes=34.5 GB, hints=none)

所以上述查询的大小是34.5 GB当除以100 MB给出~ 350随机分区时。现在设置 config 后SET spark.sql.shuffle.partitions=350,我仍然看到查询很慢。所以我的问题是——

  • 我这样做对吗?否则,请让我知道如何计算复杂查询(涉及多个连接)的随机播放量,并最终能够计算任何给定复杂查询的最佳随机播放分区数。

SKEW
对于上面提到的查询,我看到12作业是在 Spark UI 中触发的。在 UI 中,最后一个作业显示高倾斜,即一个任务是一个长条,而其他同时执行的任务由几个非常小的条表示(我希望我可以提供 UI 屏幕截图)-所以,我的问题(基于上述) 是-

  • 如何确定上述查询的哪一部分,或者具体来说,这个大型复杂查询中的哪个表/列是导致倾斜的主要原因?在一个大查询中有如此多的表和联接,通过一次联接 2 个表并检查 UI 和进度来进行测试变得非常困难和耗时。
    那么有没有什么聪明的方法来找出导致倾斜的实际连接表/列?
  • 此外,在确定导致倾斜的表/列之后,我该如何解决这个问题,以便所有分区都有相同的数据量来处理,从而加快工作速度?
  • 如何将 UI 中的特定作业(触发的火花)与查询的哪个特定部分相关联?

写入输出时倾斜:
最后,我将上述查询的输出写入S3SQL API ( %sql%) 为 -

create table final_out
using parquet
options (
         path 's3:/my/test/location/',
         mode: 'overwrite'
        )
as
select * from final_tbl
distribute by id;

即使是这样,当我检查 UI 时,我也发现了像上面这样的巨大偏差,一个任务是一个非常长的条,而其他同时执行的任务是非常小的条。如果您仔细观察,您会发现上面显示的最终查询union all与另一个具有硬编码值(即'NA' as state_nm)的查询进行了匹配。现在,由于-ed 表中有周围的100 Million记录,因此该值成为输出中列的主要值,从而产生偏斜,从而使写入非常慢。union'NA'state_nm

所以我的最后一个问题是——

  • 在作为 parquet 文件(使用 sql API)写入磁盘时,如何减轻由输出中的硬编码值引起的这种偏差?请注意,我已尝试在其repartitionPK 列(iddistribute by idcreate table

我的集群配置如下:

Nodes: 20
Cores:  8
Memory: 64 GB

对于这篇长篇文章,我感到非常抱歉,但是这些问题困扰了我很长时间。我在互联网上搜索了很多,但找不到任何具体的答案。谁能帮我解决这些问题。任何帮助表示赞赏。

谢谢。

4

1 回答 1

0

无法回答你所有的问题,但我可以分享一些想法,因为我遇到了一些问题:

如何确定上述查询的哪一部分,或者具体来说,这个大型复杂查询中的哪个表/列是导致倾斜的主要原因?

您可以列出查询中的所有表,对用于连接它们的列进行计数,并查看哪些值表示行的超大部分。如果您想自动执行此操作,请使用 pandas 分析器或出色的期望库来自动生成每个表的列摘要。

我这样做对吗?否则,请让我知道如何计算复杂查询(涉及多个连接)的随机播放量,并最终能够计算任何给定复杂查询的最佳随机播放分区数。

我不确定在随机分区设置方面还有更多工作要做,唯一想到的是在执行查询之前计算最大表的大小,并使用它来动态计算/估计随机分区数spark.conf.set("spark.sql.shuffle.partitions", calculatedNumber),但我不相信这会解决/帮助。

根据我的经验,更大的好处应该来自缓存多次使用的表、广播较小的表以及在运行查询之前在连接列上对较大的数据帧进行分区。

至于写作,我怀疑不是写 itef 会使过程变慢,而是你在写之前执行最终查询的整个计算(延迟执行),这需要大部分时间。

于 2022-01-31T11:44:20.503 回答