在测试生产用例时,我创建并保存(使用 Hive Metastore)这样的表:
table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets
table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets
我正在运行这样一个查询(在伪代码中)
table1.join(table2, [“key1”, “key2”])
.groupBy(“value2”)
.countUnique(“key1”)
常识说这个连接应该简单地用一个没有交换的排序合并连接来完成;然而spark做了一个交换然后加入。
即使对于这个特定的用例,我也可以按两个键进行存储,但由于其他一些用例,我需要按 key1 存储。当我使用这样的单个键进行(更简单)连接时:
table1.join(table2, [“key1”])
它按预期工作(即排序合并加入没有交换)。
现在我对这些表进行了优化连接,如果我想过滤,如下所示:
table1.join(table2, [“key1”])
.filter(table1.col(“key2”) == table2.col(“key2”))
它恢复到交换然后加入。
当连接键是bucketBy键的超集时,如何说服spark不要进行交换?
笔记:
我知道的一个技巧是,如果我将重写为不等式检查,而不是相等性检查,spark 不会洗牌。
(x == y) 也可以表示为 ((x >= y) & (x <= y))。如果我在最后一个示例中应用两个这样的过滤器:
.filter(table1.col(“key2”) >= table2.col(“key2”))
.filter(table1.col(“key2”) <= table2.col(“key2”))
它将继续使用没有交换的排序合并连接,但这不是解决方案,这是一个 hack。