2

我正在使用内部连接来生成记录比较,以消除重复数据

我想对这些连接加盐,以便记录比较在存在偏斜的情况下分布更均匀。

下面是一个非常简单的激励示例——真实的输入数据要大得多。

假设我们有一个如下表(此处为 csv )。

城市
查尔斯 狄更斯 伦敦
查理 狄更斯 伦敦
弗吉尼亚 羊毛 伦敦
弗吉尼亚 伦敦
玛丽 雪莱 伦敦
奥斯汀 史蒂文顿

要生成记录比较,我可以编写如下 sql:

df = spark.read.csv()
df.createOrReplaceTempView("df")

sql = """
select 
  l.first_name as first_name_l,
  r.first_name as first_name_r,
  l.surname as surname_l, 
  r.surname as surname_r,
  l.city as city_l,
  r.city as city_r
from df as l
inner join df as r
on l.city = r.city
"""
spark.sql(sql)

在大型数据集上,spark 将选择一个SortMergeJoin. 数据将在 HashPartitioned 上进行city

因此,所有 5 条记录city = london都将在一个执行器上结束,将在该执行器上london生成记录的笛卡尔积 - 总共 25 条记录。

这在实际数据集上产生了一个问题,其中的计数city=london可能为 10,000 - 在单个执行器上的单个任务中生成 100,000,000 次比较。

我的问题是我怎样才能让这个加入更均匀地分配工作?. 请注意,需要生成所有 25(或 100m)条记录比较 - 只是我们希望它们在不同任务之间进行比较

我尝试过的解决方案

我有一个非常不优雅的工作解决方案,如下所示。我正在寻求改进这一点。

第 1 步:创建一个随机整数列,random_int。为简单起见,假设它包含 1-3 范围内的整数。

步骤2:运行3次left join,并union all

select {cols}
from df as l
inner join df as r
on l.city = r.city and l.random_int = 1

UNION ALL

select {cols}
from df as l
inner join df as r
on l.city = r.city and l.random_int = 2

UNION ALL

select {cols}
from df as l
inner join df as r
on l.city = r.city and l.random_int = 3

该解决方案给出了正确的答案,并且在存在偏斜的情况下在大型数据集上运行得更快。但它在执行计划上产生了很多复杂性,我不禁感到必须有更好的方法。

这个真正的上下文问题是我的开源软件Splink的阻塞步骤。因此,提供的任何帮助都将有助于改进此软件。(当然也欢迎 PR!)

4

0 回答 0