4

最初,我有很多数据。但是使用 spark-SQL 尤其是 groupBy 可以将其缩减到可管理的大小。(适合单个节点的 RAM)

如何在所有组(分布在我的节点之间)上执行功能(并行)?

如何确保将单个组的数据收集到单个节点?例如,我可能希望local matrix用于计算,但不想遇到有关数据局部性的错误。

4

2 回答 2

2

假设你有 x 没有。执行程序(在您的情况下,每个节点可能有 1 个执行程序)。并且您希望以这样的方式对密钥上的数据进行分区,使每个密钥都落入一个独特的存储桶中,这将是一个完美的分区器。没有通用的方法这样做,但如果有一些特定于您的数据的固有分布/逻辑,则有可能实现这一点。

我处理过一个特定的案例,我发现 Spark 的内置哈希分区器在均匀分配密钥方面做得不好。所以我使用 Guava 编写了一个自定义分区器,如下所示:

  class FooPartitioner(partitions: Int) extends org.apache.spark.HashPartitioner(partitions: Int) {
    override def getPartition(key: Any): Int = {
      val hasherer = Hashing.murmur3_32().newHasher()
      Hashing.consistentHash(
        key match {
          case i: Int => hasherer.putInt(i).hash.asInt()
          case _ => key.hashCode
          },PARTITION_SIZE)
  }
 }

然后我将此分区器实例作为参数添加到我正在使用的 combineBy 中,以便以这种方式对生成的 rdd 进行分区。这可以很好地将数据分发到 x 个存储桶,但我想不能保证每个存储桶只有 1 个密钥。

如果您使用的是 Spark 1.6 并使用数据帧,您可以像这样定义一个 udf val hasher = udf((i:Int)=>Hashing.consistentHash(Hashing.murmur3_32().newHasher().putInt(i) .hash.asInt(),PARTITION_SIZE)) 并执行此操作,dataframe.repartition(hasher(keyThatYouAreUsing)) 希望这能提供一些入门提示。

于 2016-04-21T06:09:46.197 回答
1

我在 这个博客中找到了一个使用 PySpark 的 Efficient UD(A)Fs的解决方案

  1. mapPartitions 分割数据;
  2. udaf 将 spark 数据帧转换为 pandas 数据帧;
  3. 在 udaf 中执行数据 etl 逻辑并返回 pandas 数据框;
  4. udaf 将 pandas 数据帧转换为 spark 数据帧;
  5. toDF() 合并结果 spark 数据帧并像 SaveAsTable 一样持久化;

python df = df.repartition('guestid').rdd.mapPartitions(udf_calc).toDF()

于 2018-04-03T00:46:04.077 回答