1

我最近阅读了一篇文章,描述了如何自定义分区数据帧 [ https://dataninjago.com/2019/06/01/create-custom-partitioner-for-spark-dataframe/ ],其中作者用 Python 说明了该技术. 我使用 Scala,该技术看起来是解决倾斜问题的好方法,所以我尝试了类似的方法,我发现当一个人执行以下操作时:

- create 2 data frames, D1, D2
- convert D1, D2 to 2 Pair RDDs R1,R2 
    (where the key is the key you want to join on)
- repartition R1,R2 with a custom partitioner 'C'
    where 'C' has 2 partitions (p-0,p-1) and 
    stuffs everything in P-1, except keys == 'a' 
- join R1,R2 as R3
- OBSERVE that:
    - partitioner for R3 is 'C' (same for R1,R2) 
    - when printing the contents of each partition of R3  all entries
      except the one keyed by 'a' is in p-1
- set D1' <- R1.toDF 
- set D2' <- R2.toDF 

我们注意到以下结果:

0) The join of D1' and D2' produce expected results (good)
1) The partitioners for D1' and D2' are None -- not Some(C), 
   as was the case with RDD's R1/R2  (bad)
2) The contents of the glom'd underlying RDDs of D1' and D2' did 
    not have  everything (except key 'a') piled up 
    in  partition 1 as expected.(bad)

所以,我得出了以下结论……这实际上对我有用……但我无法理解使用 Python 的文章中的行为,这让我很恼火:

When one needs to use custom partitioning with Dataframes in Scala one must
drop into RDD's do the join or whatever operation on the RDD, then convert back 
to dataframe. You can't apply the custom partitioner, then convert back to 
dataframe, do your operations, and expect the custom partitioning to work.

现在......我希望我错了!也许在 Spark 内部有更多专业知识的人可以在这里指导我。我写了一个小程序(如下)来说明结果。如果你能让我直截了当,请提前致谢。

更新

除了说明问题的 Spark 代码之外,我还尝试了 Python 中原始文章的简化版本。下面的转换创建一个数据帧,提取其底层 RDD 并对其重新分区,然后恢复数据帧并验证分区器是否丢失。

说明问题的 Python 片段

from pyspark.sql.types import IntegerType

mylist = [1, 2, 3, 4]
df = spark.createDataFrame(mylist, IntegerType())

def travelGroupPartitioner(key):
    return 0

dfRDD = df.rdd.map(lambda x: (x[0],x))
dfRDD2 = dfRDD .partitionBy(8, travelGroupPartitioner)
# this line uses approach of original article and maps to only the value
# but map doesn't guarantee preserving pratitioner, so i tried without the 
# map below... 
df2 = spark.createDataFrame(dfRDD2 .map(lambda x: x[1]))
print ( df2.rdd.partitioner )  # prints None

# create dataframe from partitioned RDD _without_ the map, 
# and we _still_ lose partitioner
df3 = spark.createDataFrame(dfRDD2) 
print ( df3.rdd.partitioner )       # prints None

说明问题的 Scala 片段

object Question extends App {

  val conf =
    new SparkConf().setAppName("blah").
      setMaster("local").set("spark.sql.shuffle.partitions", "2")
  val sparkSession = SparkSession.builder .config(conf) .getOrCreate()
  val spark = sparkSession

  import spark.implicits._
  sparkSession.sparkContext.setLogLevel("ERROR")

  class CustomPartitioner(num: Int) extends Partitioner {
    def numPartitions: Int = num
    def getPartition(key: Any): Int = if (key.toString == "a") 0 else 1
  }

  case class Emp(name: String, deptId: String)
  case class Dept(deptId: String, name: String)

  val value: RDD[Emp] = spark.sparkContext.parallelize(
    Seq(
      Emp("anne", "a"),
      Emp("dave", "d"),
      Emp("claire", "c"),
      Emp("roy", "r"),
      Emp("bob", "b"),
      Emp("zelda", "z"),
      Emp("moe", "m")
    )
  )
  val employee: Dataset[Emp] = value.toDS()
  val department: Dataset[Dept] = spark.sparkContext.parallelize(
    Seq(
      Dept("a", "ant dept"),
      Dept("d", "duck dept"),
      Dept("c", "cat dept"),
      Dept("r", "rabbit dept"),
      Dept("b", "badger dept"),
      Dept("z", "zebra dept"),
      Dept("m", "mouse dept")
    )
  ).toDS()


  val dumbPartitioner: Partitioner = new CustomPartitioner(2)

  // Convert to-be-joined dataframes to custom repartition RDDs [ custom partitioner:  cp ]
  //
  val deptPairRdd: RDD[(String, Dept)] = department.rdd.map { dept => (dept.deptId, dept) }
  val empPairRdd: RDD[(String, Emp)] = employee.rdd.map { emp: Emp => (emp.deptId, emp) }

  val cpEmpRdd: RDD[(String, Emp)] = empPairRdd.partitionBy(dumbPartitioner)
  val cpDeptRdd: RDD[(String, Dept)] = deptPairRdd.partitionBy(dumbPartitioner)

  assert(cpEmpRdd.partitioner.get == dumbPartitioner)
  assert(cpDeptRdd.partitioner.get == dumbPartitioner)

  // Here we join using RDDs and ensure that the resultant rdd is partitioned so most things end up in partition 1
  val joined: RDD[(String, (Emp, Dept))] = cpEmpRdd.join(cpDeptRdd)
  val reso: Array[(Array[(String, (Emp, Dept))], Int)] = joined.glom().collect().zipWithIndex
  reso.foreach((item: Tuple2[Array[(String, (Emp, Dept))], Int]) => println(s"array size: ${item._2}. contents: ${item._1.toList}"))

  System.out.println("partitioner of RDD created by joining 2 RDD's w/ custom partitioner: " + joined.partitioner)
  assert(joined.partitioner.contains(dumbPartitioner))

  val recoveredDeptDF: DataFrame = deptPairRdd.toDF
  val recoveredEmpDF: DataFrame = empPairRdd.toDF

  System.out.println(
    "partitioner for DF recovered from custom partitioned RDD (not as expected!):" +
      recoveredDeptDF.rdd.partitioner)
  val joinedDf = recoveredEmpDF.join(recoveredDeptDF, "_1")
  println("printing results of joining the 2 dataframes we 'recovered' from the custom partitioned RDDS (looks good)")
  joinedDf.show()

  println("PRINTING partitions of joined DF does not match the glom'd results we got from underlying RDDs")
  joinedDf.rdd.glom().collect().
    zipWithIndex.foreach {
    item: Tuple2[Any, Int] =>
      val asList = item._1.asInstanceOf[Array[org.apache.spark.sql.Row]].toList
      println(s"array size: ${item._2}. contents: $asList")
  }

  assert(joinedDf.rdd.partitioner.contains(dumbPartitioner))  // this will fail ;^(
}
4

2 回答 2

2

查看我的新库,它将partitionBy方法添加到Dataset/ DataframeAPI 级别。

以你的EmpDept对象为例:

class DeptByIdPartitioner extends TypedPartitioner[Dept] {
  override def getPartitionIdx(value: Dept): Int = if (value.deptId.startsWith("a")) 0 else 1
  override def numPartitions: Int = 2
  override def partitionKeys: Option[Set[PartitionKey]] = Some(Set(("deptId", StringType)))
}

class EmpByDepIdPartitioner extends TypedPartitioner[Emp] {
  override def getPartitionIdx(value: Emp): Int = if (value.deptId.startsWith("a")) 0 else 1
  override def numPartitions: Int = 2
  override def partitionKeys: Option[Set[PartitionKey]] = Some(Set(("deptId", StringType)))
}

请注意,我们正在扩展TypedPartitioner.
它是编译时安全的,您将无法使用分区器重新分区persons的数据集emp

val spark = SparkBuilder.getSpark()

import org.apache.spark.sql.exchange.implicits._  //<-- addtitonal import
import spark.implicits._

val deptPartitioned = department.repartitionBy(new DeptByIdPartitioner)
val empPartitioned  = employee.repartitionBy(new EmpByDepIdPartitioner)

让我们检查一下我们的数据是如何分区的:

Dep dataset:
Partition N 0
    : List([a,ant dept])
Partition N 1
    : List([d,duck dept], [c,cat dept], [r,rabbit dept], [b,badger dept], [z,zebra dept], [m,mouse dept])

如果我们加入由相同的关键数据集重新分区,Catalyst 将正确识别这一点:

val joined = deptPartitioned.join(empPartitioned, "deptId")

println("Joined:")
val result: Array[(Int, Array[Row])] = joined.rdd.glom().collect().zipWithIndex.map(_.swap)
for (elem <- result) {
  println(s"Partition N ${elem._1}")
  println(s"\t: ${elem._2.toList}")
}

Partition N 0
    : List([a,ant dept,anne])
Partition N 1
    : List([b,badger dept,bob], [c,cat dept,claire], [d,duck dept,dave], [m,mouse dept,moe], [r,rabbit dept,roy], [z,zebra dept,zelda])
于 2019-12-04T12:36:08.723 回答
0

您使用的是什么版本的 Spark?如果是 2.x 及以上版本,建议使用 Dataframe/Dataset API,而不是 RDD

使用上述 API 比使用 RDD 更容易,并且在更高版本的 Spark 上表现更好

您可能会发现下面的链接对于如何加入 DF 很有用: 如何在 Scala 中加入两个数据帧并通过索引从数据帧中选择几列?

获得加入的 DataFrame 后,您可以使用下面的链接按列值进行分区,我假设您正在尝试实现: Partition a spark dataframe based on column value?

于 2019-08-10T13:41:05.503 回答