0

我有两张桌子。表 t1 定义了元数据。即,理想交易应包含哪些属性值。它还通过数组中记录的顺序来定义属性的重要性顺序。第一个记录是最重要的,它的权重为 1。第二个有 0.9,第 3 - 0.8,第 4 - 0.7 等等....任何高于 10 的都是最不重要的。我需要查找事务表 t2 中填写的数据质量。找出填充属性的百分比以及它们的质量等级。

t1

 ------------------------------------
|  a_id  |    attribute_values      |    
------------------------------------
|  12345 | ["a1", "a2", "a3", "a5"] |
|  6789  | ["b1", "b4", "b7"]       |
 ------------------------------------
 

t2

------------------------------------
| b_id  |  a_id   | attribute_values|
------------------------------------
| B123  |  12345  | ["a2", "a5"]    |
| B456  |  6789   | ["b1, "b7"]     |
-------------------------------------

我正在寻找方法来计算我的 t2 记录的质量排名,如下所示

------------------------------------------
| b_id | percent_complete | quality_rank |
------------------------------------------
| B123 |    50            |    0.4.      |
| B456 |   66.66          |    0.6.      |
------------------------------------------ 

B123 - (4 个中有 2 个)完成 50%。质量等级 - (0.9+0.7)/4 = 0.4

B456 - (3 个中的 2 个)完成 66.66%。质量等级 - (1+0.8)/3 = 0.6

4

1 回答 1

0

通过爆炸两个表来解决它。计算第一个表的权重和排名,然后与另一个表连接。虽然无法在单个 sql 中做到这一点。

scala> val t1 = Seq((12345, List("a1", "a2", "a3", "a5")), (6789, List("b1", "b5", "b7")))。 toDF("a_id", "attribute_values")

scala> val t2 = Seq(("B123", 12345, List("a2", "a5")), ("B456", 6789, List("b1", "b7"))).toDF("b_id ","a_id", "attribute_values")

scala> val t1_1 = t1.select($"a_id",posexplode($"attribute_values"))

scala> t1_1.show

+-----+---+---+
| a_id|pos|col|
+-----+---+---+
|12345|  0| a1|
|12345|  1| a2|
|12345|  2| a3|
|12345|  3| a5|
| 6789|  0| b1|
| 6789|  1| b5|
| 6789|  2| b7|
+-----+---+---+

scala> t1_1.createOrReplaceTempView("tab_t1_1")

scala> spark.sql("select *, 1 - (pos * 0.1) as calc_weight, count(col) over (partition by a_id) as rec_count from tab_t1_1").show

+-----+---+---+-----------+---------+
| a_id|pos|col|calc_weight|rec_count|
+-----+---+---+-----------+---------+
| 6789|  0| b1|        1.0|        3|
| 6789|  1| b5|        0.9|        3|
| 6789|  2| b7|        0.8|        3|
|12345|  0| a1|        1.0|        4|
|12345|  1| a2|        0.9|        4|
|12345|  2| a3|        0.8|        4|
|12345|  3| a5|        0.7|        4|
+-----+---+---+-----------+---------+

scala> val t1_2 = spark.sql("select *, 1 - (pos * 0.1) as calc_weight, count(col) over (partition by a_id) as rec_count from tab_t1_1")

scala> t1_2.createOrReplaceTempView("tab_t1_2")

scala> val t2_1 = t2.select($"b_id", $"a_id", explode($"attribute_values"))

scala> t2_1.show

+----+-----+---+
|b_id| a_id|col|
+----+-----+---+
|B123|12345| a2|
|B123|12345| a5|
|B456| 6789| b1|
|B456| 6789| b7|
+----+-----+---+

scala> t2_1.createOrReplaceTempView("tab_t2_1")

scala> spark.sql("选择b_id, t1.a_id, round(count(t2.col)*100/max(t1.rec_count),2) as percent_complete, round(sum(t1.calc_weight)/ max(t1. rec_count),2) as quality_rank from tab_t1_2 t1, tab_t2_1 t2 where t1.a_id = t2.a_id and t1.col = t2.col group by b_id, t1.a_id").show

+----+-----+----------------+------------+
|b_id| a_id|percent_complete|quality_rank|
+----+-----+----------------+------------+
|B123|12345|            50.0|        0.40|
|B456| 6789|           66.67|        0.60|
+----+-----+----------------+------------+
于 2020-07-03T09:00:40.813 回答