3

假设我有以下 Pyspark 数据框:

data_df = (
    spark
    .createDataFrame(
        [
            [0,0,0],[0,0,1],[0,1,2],[0,2,1],[0,0,2],[0,1,0],[0,20,21],[0,23,20],[0,21,25],[0,22,22],
            [1,100,102],[1,105,101],[1,102,102],[1,103,100],[1,1000,1000],[1,1001,1005],[1,1002,1001]
        ]
    )
)

我可以展示它:

+---+----+----+
| _1|  _2|  _3|
+---+----+----+
|  0|  20|  21|
|  1|1001|1005|
|  0|   0|   2|
|  1| 103| 100|
|  0|  23|  20|
|  1|1002|1001|
|  0|   0|   0|
|  0|  22|  22|
|  0|   0|   1|
|  0|   1|   0|
|  1| 100| 102|
|  1|1000|1000|
|  0|   1|   2|
|  1| 105| 101|
|  0|   2|   1|
|  1| 102| 102|
|  0|  21|  25|
+---+----+----+

现在,我要求重新分区:

rep_data_df = (
    data_df
    .repartition(2, "_1")
)

据我了解,这将创建 2 个分区,即根据第一列的值分布的数据。

不过,如果我打印分区数和分区结构,结果如下:

print("Number of partitions: {}".format(rep_data_df.rdd.getNumPartitions()))
print("Partitions structure: {}".format(rep_data_df.rdd.glom().collect()))

Number of partitions: 2
Partitions structure: [[], [Row(_1=0, _2=21, _3=25), Row(_1=0, _2=2, _3=1), Row(_1=1, _2=102, _3=102), Row(_1=0, _2=23, _3=20), Row(_1=1, _2=1002, _3=1001), Row(_1=0, _2=0, _3=2), Row(_1=1, _2=103, _3=100), Row(_1=0, _2=0, _3=0), Row(_1=0, _2=22, _3=22), Row(_1=0, _2=20, _3=21), Row(_1=1, _2=1001, _3=1005), Row(_1=0, _2=1, _3=2), Row(_1=1, _2=105, _3=101), Row(_1=0, _2=0, _3=1), Row(_1=1, _2=100, _3=102), Row(_1=0, _2=1, _3=0), Row(_1=1, _2=1000, _3=1000)]]

如您所见,有 2 个分区,没关系,但数据并没有像我预期的那样分区:所有数据都在一个分区内,而另一个是空的。

更奇怪的是,如果我要求 3 个分区:

Number of partitions: 3
Partitions structure: [[], [Row(_1=0, _2=1, _3=2), Row(_1=0, _2=1, _3=0), Row(_1=0, _2=0, _3=2), Row(_1=0, _2=2, _3=1), Row(_1=0, _2=23, _3=20), Row(_1=0, _2=21, _3=25), Row(_1=0, _2=0, _3=0), Row(_1=0, _2=22, _3=22), Row(_1=0, _2=0, _3=1), Row(_1=0, _2=20, _3=21)], [Row(_1=1, _2=105, _3=101), Row(_1=1, _2=1000, _3=1000), Row(_1=1, _2=102, _3=102), Row(_1=1, _2=100, _3=102), Row(_1=1, _2=103, _3=100), Row(_1=1, _2=1001, _3=1005), Row(_1=1, _2=1002, _3=1001)]]

即,我得到了一个空分区和 2 个具有预期数据分布的其他分区。

难道我做错了什么?任何人都可以解释这种行为吗?

谢谢!

编辑 1

特别好奇!0如果我将第一列中所有出现的 替换为2,一切都会按我的预期进行!

data_df = (
    spark
    .createDataFrame(
        [
            [2,0,0],[2,0,1],[2,1,2],[2,2,1],[2,0,2],[2,1,0],[2,20,21],[2,23,20],[2,21,25],[2,22,22],
            [1,100,102],[1,105,101],[1,102,102],[1,103,100],[1,1000,1000],[1,1001,1005],[1,1002,1001]
        ]
    )
)

显示它:

+---+----+----+
| _1|  _2|  _3|
+---+----+----+
|  2|   2|   1|
|  2|  23|  20|
|  2|  20|  21|
|  2|   0|   0|
|  2|  21|  25|
|  2|   0|   2|
|  2|   1|   0|
|  2|   0|   1|
|  2|  22|  22|
|  2|   1|   2|
|  1| 100| 102|
|  1|1000|1000|
|  1|1001|1005|
|  1|1002|1001|
|  1| 105| 101|
|  1| 103| 100|
|  1| 102| 102|
+---+----+----+

然后要求重新分区并检查分区:

Number of partitions: 2
Partitions structure: [[Row(_1=2, _2=22, _3=22), Row(_1=2, _2=1, _3=0), Row(_1=2, _2=0, _3=2), Row(_1=2, _2=2, _3=1), Row(_1=2, _2=0, _3=0), Row(_1=2, _2=20, _3=21), Row(_1=2, _2=0, _3=1), Row(_1=2, _2=21, _3=25), Row(_1=2, _2=1, _3=2), Row(_1=2, _2=23, _3=20)], [Row(_1=1, _2=100, _3=102), Row(_1=1, _2=105, _3=101), Row(_1=1, _2=1001, _3=1005), Row(_1=1, _2=1002, _3=1001), Row(_1=1, _2=103, _3=100), Row(_1=1, _2=102, _3=102), Row(_1=1, _2=1000, _3=1000)]]

值有什么问题0?xD

这是一个错误吗?

4

0 回答 0