0

mapPartitions我在一个松散地实现Uber 案例研究的库中使用 DataFrame 。输出 DataFrame 有一些新的(大)列,输入 DataFrame 在执行之前进行了分区和内部排序mapPartitions。大多数用户会在其他列上进行投影,然后在已经分区的列上进行聚合。这会导致昂贵的冗余洗牌,因为mapPartitions使用planWithBarrier. 我想知道在催化剂 api 中是否有一个简单的解决方案?

代码示例:

val resultDF = keysDF
    .select("key1") //non unique
    .join(mappingTable.select("key1", "key2"), "key1") //key1->key2 many to one
    .repartition($"key2")
    .sortWithinPartitions($"key1", $"key2")
    .mapPartitions(appendThreeColumns))(RowEncoder(outputSchema))
    .select("key1", "key2", "value1", "value2", "value3")

如您所见,按(注意多对一关系)进行resultDF分区并在内部排序。key1

但是resultDF.groupBy("key1").agg(count("value1")),例如,将导致交换。

欢迎任何建议。

4

1 回答 1

0

我认为您正在创建更多具有mapPartitions逻辑应用聚合操作的列,因此您会在多个执行器之间进行大量洗牌。所以Spark有一个bucketing的概念。请点击此链接。之前使用这个概念,mapPartitions然后尝试在mapPartitions. 我认为它会减少网络 I/O。

于 2019-08-01T07:10:36.973 回答