3

我在 Spark (v2.1.1) 中有一个包含分层数据的 3 列(如下所示)的数据集。

  • 我的目标是根据父子层次结构为每一行分配增量编号。从图形上可以说,分层数据是树的集合。
  • 根据下表,我已经根据“Global_ID”对行进行了分组。现在我想以增量顺序生成“值”列,但基于“父”和“子”列的数据层次结构。

表格表示(值是所需的输出):

    +-----------+--------+-------+         +-----------+--------+-------+-------+
    |      Current Dataset       |         |      Desired Dataset (Output)      |
    +-----------+--------+-------+         +-----------+--------+-------+-------+
    | Global_ID | Parent | Child |         | Global_ID | Parent | Child | Value |
    +-----------+--------+-------+         +-----------+--------+-------+-------+
    |       111 |    111 |   123 |         |       111 |    111 |   111 |     1 |
    |       111 |    135 |   246 |         |       111 |    111 |   123 |     2 |
    |       111 |    123 |   456 |         |       111 |    123 |   789 |     3 |
    |       111 |    123 |   789 |         |       111 |    123 |   456 |     4 |
    |       111 |    111 |   111 |         |       111 |    111 |   135 |     5 |
    |       111 |    135 |   468 |         |       111 |    135 |   246 |     6 |
    |       111 |    135 |   268 |         |       111 |    135 |   468 |     7 |
    |       111 |    268 |   321 |         |       111 |    135 |   268 |     8 |
    |       111 |    138 |   139 |         |       111 |    268 |   321 |     9 |
    |       111 |    111 |   135 |         |       111 |    111 |   138 |    10 |
    |       111 |    111 |   138 |         |       111 |    138 |   139 |    11 |
    |       222 |    222 |   654 |         |       222 |    222 |   222 |    12 |
    |       222 |    654 |   721 |         |       222 |    222 |   987 |    13 |
    |       222 |    222 |   222 |         |       222 |    222 |   654 |    14 |
    |       222 |    721 |   127 |         |       222 |    654 |   721 |    15 |
    |       222 |    222 |   987 |         |       222 |    721 |   127 |    16 |
    |       333 |    333 |   398 |         |       333 |    333 |   333 |    17 |
    |       333 |    333 |   498 |         |       333 |    333 |   398 |    18 |
    |       333 |    333 |   333 |         |       333 |    333 |   498 |    19 |
    |       333 |    333 |   598 |         |       333 |    333 |   598 |    20 |
    +-----------+--------+-------+         +-----------+--------+-------+-------+

树表示(期望值在每个节点旁边表示):

                      +-----+                                           +-----+
                   1  | 111 |                                       17  | 333 |
                      +--+--+                                           +--+--+
                         |                                                 |
         +---------------+--------+-----------------+           +----------+----------+
         |                        |                 |           |          |          |
      +--v--+                  +--v--+           +--v--+     +--v--+    +--v--+    +--v--+
   2  | 123 |                5 | 135 |        10 | 138 |     | 398 |    | 498 |    | 598 |
      +--+--+                  +--+--+           +--+--+     +--+--+    +--+--+    +--+--+  
   +-----+-----+         +--------+--------+        |          18         19         20
   |           |         |        |        |        |  
+--v--+     +--v--+   +--v--+  +--v--+  +--v--+  +--v--+ 
| 789 |     | 456 |   | 246 |  | 468 |  | 268 |  | 139 |                 +-----+
+-----+     +-----+   +-----+  +-----+  +--+--+  +-----+             12  | 222 |
   3           4         6        7      8 |        11                   +--+--+
                                        +--v--+                             |
                                        | 321 |                      +------+-------+
                                        +--+--+                      |              |
                                           9                      +--v--+        +--v--+
                                                               13 | 987 |    14  | 654 |
                                                                  +--+--+        +--+--+
                                                                                    |
                                                                                 +--v--+
                                                                             15  | 721 |
                                                                                 +--+--+
                                                                                    |
                                                                                 +--v--+
                                                                             16  | 127 |
                                                                                 +--+--+

代码片段:

Dataset<Row> myDataset = spark
                .sql("select Global_ID, Parent, Child from RECORDS");

JavaPairRDD<Row,Long> finalDataset = myDataset.groupBy(new Column("Global_ID"))
    .agg(functions.sort_array(functions.collect_list(new Column("Parent").as("parent_col"))),
        functions.sort_array(functions.collect_list(new Column("Child").as("child_col"))))
    .orderBy(new Column("Global_ID"))
    .withColumn("vars", functions.explode(<Spark UDF>)
    .select(new Column("vars"),new Column("parent_col"),new Column("child_col"))
    .javaRDD().zipWithIndex();


// Sample UDF (TODO: Actual Implementation)   
spark.udf().register("computeValue",
                (<Column Names>) -> <functionality & implementation>,
                DataTypes.<xxx>);

经过大量研究和博客中的许多建议后,我尝试了以下方法,但无济于事,无法达到我的场景的结果。

技术栈:

  • Apache Spark (v2.1.1)

  • 爪哇 8

  • AWS EMR 集群(Spark 应用部署)


数据量:

  • 数据集中大约 2000 万行

尝试的方法:

  1. Spark GraphX + GraphFrames:

  2. Spark GraphX Pregel API:


对当前方法中的替代(或)修改的任何建议都将非常有帮助,因为我完全迷失了为这个用例找出解决方案。

感谢你的帮助!谢谢!

4

1 回答 1

0

注意:以下解决方案在 scala spark 中。您可以轻松地转换为 java 代码。

看一下这个。我尝试使用 Spark Sql 进行操作,您可以了解一下。基本上想法是在聚合和分组它们时对子、父和 globalid 进行排序。一旦按 globalid 分组和排序,就展开其余的。您将获得有序的结果表,稍后您可以zipWithIndex将排名(值)添加到该表中

   import org.apache.spark.sql.SQLContext
   import org.apache.spark.sql.functions._
   import org.apache.spark.sql.expressions.UserDefinedFunction
   import org.apache.spark.sql.functions.udf

   val sqlContext = new SQLContext(sc)
   import sqlContext.implicits._

   val t = Seq((111,111,123), (111,111,111), (111,123,789), (111,268,321), (222,222,654), (222,222,222), (222,721,127), (333,333,398), (333,333,333), (333,333,598))
   val ddd = sc.parallelize(t).toDF
   val zip = udf((xs: Seq[Int], ys: Seq[Int]) => xs zip ys)
   val dd1 = ddd
    .groupBy($"_1")
    .agg(sort_array(collect_list($"_2")).as("v"),
         sort_array(collect_list($"_3")).as("w"))
    .orderBy(asc("_1"))
    .withColumn("vars", explode(zip($"v", $"w")))
    .select($"_1", $"vars._1", $"vars._2").rdd.zipWithIndex

  dd1.collect

输出

    res24: Array[(org.apache.spark.sql.Row, Long)] = Array(([111,111,111],0), ([111,111,123],1), ([111,123,321],2),
([111,268,789],3), ([222,222,127],4), ([222,222,222],5), ([222,721,654],6),([333,333,333],7), ([333,333,398],8), ([333,333,598],9))
于 2017-12-27T19:09:32.120 回答