我正在使用内部连接来生成记录比较,以消除重复数据。
我想对这些连接加盐,以便记录比较在存在偏斜的情况下分布更均匀。
下面是一个非常简单的激励示例——真实的输入数据要大得多。
假设我们有一个如下表(此处为 csv )。
名 | 姓 | 城市 |
---|---|---|
查尔斯 | 狄更斯 | 伦敦 |
查理 | 狄更斯 | 伦敦 |
弗吉尼亚 | 羊毛 | 伦敦 |
弗吉尼亚 | 狼 | 伦敦 |
玛丽 | 雪莱 | 伦敦 |
简 | 奥斯汀 | 史蒂文顿 |
要生成记录比较,我可以编写如下 sql:
df = spark.read.csv()
df.createOrReplaceTempView("df")
sql = """
select
l.first_name as first_name_l,
r.first_name as first_name_r,
l.surname as surname_l,
r.surname as surname_r,
l.city as city_l,
r.city as city_r
from df as l
inner join df as r
on l.city = r.city
"""
spark.sql(sql)
在大型数据集上,spark 将选择一个SortMergeJoin
. 数据将在 HashPartitioned 上进行city
。
因此,所有 5 条记录city = london
都将在一个执行器上结束,将在该执行器上london
生成记录的笛卡尔积 - 总共 25 条记录。
这在实际数据集上产生了一个问题,其中的计数city=london
可能为 10,000 - 在单个执行器上的单个任务中生成 100,000,000 次比较。
我的问题是我怎样才能让这个加入更均匀地分配工作?. 请注意,需要生成所有 25(或 100m)条记录比较 - 只是我们希望它们在不同任务之间进行比较
我尝试过的解决方案
我有一个非常不优雅的工作解决方案,如下所示。我正在寻求改进这一点。
第 1 步:创建一个随机整数列,random_int
。为简单起见,假设它包含 1-3 范围内的整数。
步骤2:运行3次left join,并union all
select {cols}
from df as l
inner join df as r
on l.city = r.city and l.random_int = 1
UNION ALL
select {cols}
from df as l
inner join df as r
on l.city = r.city and l.random_int = 2
UNION ALL
select {cols}
from df as l
inner join df as r
on l.city = r.city and l.random_int = 3
该解决方案给出了正确的答案,并且在存在偏斜的情况下在大型数据集上运行得更快。但它在执行计划上产生了很多复杂性,我不禁感到必须有更好的方法。
这个真正的上下文问题是我的开源软件Splink的阻塞步骤。因此,提供的任何帮助都将有助于改进此软件。(当然也欢迎 PR!)