9

在测试生产用例时,我创建并保存(使用 Hive Metastore)这样的表:

table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets

table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets

我正在运行这样一个查询(在伪代码中)

table1.join(table2, [“key1”, “key2”])
 .groupBy(“value2”)
 .countUnique(“key1”)

常识说这个连接应该简单地用一个没有交换的排序合并连接来完成;然而spark做了一个交换然后加入。

即使对于这个特定的用例,我也可以按两个键进行存储,但由于其他一些用例,我需要按 key1 存储。当我使用这样的单个键进行(更简单)连接时:

table1.join(table2, [“key1”])

它按预期工作(即排序合并加入没有交换)。

现在我对这些表进行了优化连接,如果我想过滤,如下所示:

table1.join(table2, [“key1”])
 .filter(table1.col(“key2”) == table2.col(“key2”))

它恢复到交换然后加入。

当连接键是bucketBy键的超集时,如何说服spark不要进行交换?

笔记:

我知道的一个技巧是,如果我将重写为不等式检查,而不是相等性检查,spark 不会洗牌。

(x == y) 也可以表示为 ((x >= y) & (x <= y))。如果我在最后一个示例中应用两个这样的过滤器:

.filter(table1.col(“key2”) >= table2.col(“key2”))

.filter(table1.col(“key2”) <= table2.col(“key2”))

它将继续使用没有交换的排序合并连接,但这不是解决方案,这是一个 hack。

4

4 回答 4

5

根据一些研究和探索,这似乎是最简单的解决方案:

基于此示例:

table1.join(table2, [“key1”])
      .filter(table1.col(“key2”) == table2.col(“key2”))

而不是使用equalTo (==)来自 Spark 的,实现自定义MyEqualTo(通过委托给 sparkEqualTo实现很好)似乎解决了这个问题。这样,spark 不会优化(!)连接,它只会将过滤器拉到 SortMergeJoin 中。

类似地,连接条件也可以这样形成:

(table1.col(“key1”) == table2.col(“key1”)) AND
table1.col(“key2”).myEqualTo(table2.col(“key2”))
于 2019-07-23T09:23:09.223 回答
1

我面临同样的问题。好像有个PR完了,正好解决了这个问题

(公关)https://github.com/apache/spark/pull/19054

(Jira 票)https://issues.apache.org/jira/browse/SPARK-18067

但我原以为它已经包含在内(我使用的是 Spark 3.0.0,问题仍然存在,而票证已于 2019 年 5 月 21 日解决,比 Spark3 发布早一年多)。

Thanks for the "hack" using inequality operators, doesn't feel great but it's an easy workaround. I will try to patch my spark version with the solution in the PR as well, but this is less sustainable/reproducable if I want to share my code.

于 2020-08-31T18:44:06.720 回答
0

org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin是通过 Join 推送谓词的优化器规则。~~
我们可以从优化器规则中排除这条规则。这样我们就不必对用户代码进行任何更改。
要排除,我们可以执行以下操作之一
1. --conf spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin.
2. 在 spark-defaults .conf 中添加属性。
3.添加set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin到用户代码。

同样,这又是一个 hack。.
理想情况下,过滤器应该通过 join 下推,这样可以减少要连接的行数

更新:
1. 下推我错了。由于谓词具有来自两个表的列,因此不会有过滤器下推。
2.为什么当where子句有“不相等”谓词时 SortMergeJoin(SMJ) 不添加额外的交换?
这是因为 SMJ 只能将基于等式的谓词视为连接条件org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys#unapply的一部分

并且负责添加交换的 EnsureRequirements 看到 SMJ 没有新的加入条件并且输出分布已经得到满足。
代码:org.apache.spark.sql.execution.exchange.EnsureRequirements#ensureDistributionAndOrdering
3.哪个是有效的 - 添加一个执行等于或将谓词表示为大于和小于组合的 UDF?.
为了评估这一点,我使用以下方法检查了生成的代码,

val df = spark.sql(<joinquery>)
df.queryExecution.debug.codegen

一种。UDF 等于 - 涉及虚函数调用的额外开销。
湾。小于和大于的组合 - 没有虚函数调用。一旦我们找到一个连接的行(使用 key1),代码就会一一检查其他谓词。

从上述 3 中的观察来看,使用基于非等式的谓词似乎更有效。

于 2019-07-24T07:46:08.100 回答
0

**基于您的伪代码**

table1.join(table2, [“key1”, “key2”]) .groupBy(“value2”) .countUnique(“key1”)

我想解决方案是

作为第一步,只需加入表格并获取数据框。

df = table1.join(table2, [“key1”, “key2”])

然后分组并进行不同的计数

df.select(“value2”,“key1”).distinct().groupBy(“value2”,“key1”).count().show()
于 2019-07-25T20:25:52.077 回答