mapPartitions
我在一个松散地实现Uber 案例研究的库中使用 DataFrame 。输出 DataFrame 有一些新的(大)列,输入 DataFrame 在执行之前进行了分区和内部排序mapPartitions
。大多数用户会在其他列上进行投影,然后在已经分区的列上进行聚合。这会导致昂贵的冗余洗牌,因为mapPartitions
使用planWithBarrier
. 我想知道在催化剂 api 中是否有一个简单的解决方案?
代码示例:
val resultDF = keysDF
.select("key1") //non unique
.join(mappingTable.select("key1", "key2"), "key1") //key1->key2 many to one
.repartition($"key2")
.sortWithinPartitions($"key1", $"key2")
.mapPartitions(appendThreeColumns))(RowEncoder(outputSchema))
.select("key1", "key2", "value1", "value2", "value3")
如您所见,按(注意多对一关系)进行resultDF
分区并在内部排序。key1
但是resultDF.groupBy("key1").agg(count("value1"))
,例如,将导致交换。
欢迎任何建议。