0

来自关系数据库的数据被加载到 spark 中——据说是每天,但实际上不是每天。此外,它是数据库的完整副本 - 没有增量加载。

为了轻松地将维度表与主要事件数据连接起来,我想要:

  • 对其进行重复数据删除(即提高以后广播加入的可能性)
  • 有 valid_to/valid_from 列,所以即使数据不是每天可用(不一致),它仍然可以很好地使用(从下游)

我正在使用 spark 3.0.1 并希望以 SCD2 样式转换现有数据 - 而不会丢失历史记录。

spark-shell

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window

case class Foo (key:Int, value:Int, date:String)
val d = Seq(Foo(1, 1, "20200101"), Foo(1, 8, "20200102"), Foo(1, 9, "20200120"),Foo(1, 9, "20200121"),Foo(1, 9, "20200122"), Foo(1, 1, "20200103"), Foo(2, 5, "20200101"), Foo(1, 10, "20200113")).toDF
d.show

val windowDeduplication =  Window.partitionBy("key", "value").orderBy("key", "date")
val windowPrimaryKey = Window.partitionBy("key").orderBy("key", "date")

val nextThing = lead("date", 1).over(windowPrimaryKey)
d.withColumn("date", to_date(col("date"), "yyyyMMdd")).withColumn("rank", rank().over(windowDeduplication)).filter(col("rank") === 1).drop("rank").withColumn("valid_to", nextThing).withColumn("valid_to", when(nextThing.isNotNull, date_sub(nextThing, 1)).otherwise(current_date)).withColumnRenamed("date", "valid_from").orderBy("key", "valid_from", "valid_to").show

结果是:

+---+-----+----------+----------+
|key|value|valid_from|  valid_to|
+---+-----+----------+----------+
|  1|    1|2020-01-01|2020-01-01|
|  1|    8|2020-01-02|2020-01-12|
|  1|   10|2020-01-13|2020-01-19|
|  1|    9|2020-01-20|2020-10-09|
|  2|    5|2020-01-01|2020-10-09|
+---+-----+----------+----------+

这已经很不错了。然而:

|  1|    1|2020-01-03|   2|2020-01-12|

丢失了。即,稍后(中间更改之后)再次出现的任何值都将丢失。如何在不保持更大排名的情况下保持这一行,例如:

d.withColumn("date", to_date(col("date"), "yyyyMMdd")).withColumn("rank", rank().over(windowDeduplication)).withColumn("valid_to", nextThing).withColumn("valid_to", 

when(nextThing.isNotNull, date_sub(nextThing, 1)).otherwise(current_date)).withColumnRenamed("date", "valid_from").orderBy("key", "valid_from", "valid_to").show

+---+-----+----------+----+----------+
|key|value|valid_from|rank|  valid_to|
+---+-----+----------+----+----------+
|  1|    1|2020-01-01|   1|2020-01-01|
|  1|    8|2020-01-02|   1|2020-01-02|
|  1|    1|2020-01-03|   2|2020-01-12|
|  1|   10|2020-01-13|   1|2020-01-19|
|  1|    9|2020-01-20|   1|2020-01-20|
|  1|    9|2020-01-21|   2|2020-01-21|
|  1|    9|2020-01-22|   3|2020-10-09|
|  2|    5|2020-01-01|   1|2020-10-09|
+---+-----+----------+----+----------+

这绝对是不希望的

  • 这个想法是删除重复项
  • 但使用 valid_to、valid_from 保留对数据的任何历史更改

如何正确地将其转换为 SCD2 表示,即具有 valid_from、valid_to 但不丢弃中间状态?

注意:我不需要更新现有数据(合并、加入)。重新创建/覆盖它很好。

在 Spark 中实现 SCD 类型 2似乎太复杂了。在不需要状态处理的情况下,有没有更好的方法?即我有来自数据库的每日完整副本的数据,并希望对其进行重复数据删除。

4

1 回答 1

0

以前的方法只保留第一个(最早的)版本的副本。我认为唯一没有连接状态处理的解决方案是使用窗口函数,其中每个值都与前一行进行比较 - 如果整行没有变化,则将其丢弃。

可能效率较低 - 但更准确。但这也取决于手头的用例,即再次看到更改的值的可能性有多大。

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window

case class Foo (key:Int, value:Int, value2:Int, date:String)
val d = Seq(Foo(1, 1,1, "20200101"), Foo(1, 8,1, "20200102"), Foo(1, 9,1, "20200120"),Foo(1, 6,1, "20200121"),Foo(1, 9,1, "20200122"), Foo(1, 1,1, "20200103"), Foo(2, 5,1, "20200101"), Foo(1, 10,1, "20200113"), Foo(1, 9,1, "20210120"),Foo(1, 9,1, "20220121"),Foo(1, 9,3, "20230122")).toDF

def compare2Rows(key:Seq[String], sortChangingIgnored:Seq[String], timeColumn:String)(df:DataFrame):DataFrame = {
    val windowPrimaryKey = Window.partitionBy(key.map(col):_*).orderBy(sortChangingIgnored.map(col):_*)
    val columnsToCompare = df.drop(key ++ sortChangingIgnored:_*).columns

    val nextDataChange = lead(timeColumn, 1).over(windowPrimaryKey)

    val deduplicated = df.withColumn("data_changes", columnsToCompare.map(e=> col(e) =!= lead(col(e), 1).over(windowPrimaryKey)).reduce(_ or _)).filter(col("data_changes").isNull or col("data_changes"))
    deduplicated.withColumn("valid_to", when(nextDataChange.isNotNull, date_sub(nextDataChange, 1)).otherwise(current_date)).withColumnRenamed("date", "valid_from").drop("data_changes")
}
d.orderBy("key", "date").show
d.withColumn("date", to_date(col("date"), "yyyyMMdd")).transform(compare2Rows(Seq("key"), Seq("date"), "date")).orderBy("key", "valid_from", "valid_to").show

返回:

+---+-----+------+----------+----------+
|key|value|value2|valid_from|  valid_to|
+---+-----+------+----------+----------+
|  1|    1|     1|2020-01-01|2020-01-01|
|  1|    8|     1|2020-01-02|2020-01-02|
|  1|    1|     1|2020-01-03|2020-01-12|
|  1|   10|     1|2020-01-13|2020-01-19|
|  1|    9|     1|2020-01-20|2020-01-20|
|  1|    6|     1|2020-01-21|2022-01-20|
|  1|    9|     1|2022-01-21|2023-01-21|
|  1|    9|     3|2023-01-22|2020-10-09|
|  2|    5|     1|2020-01-01|2020-10-09|
+---+-----+------+----------+----------+

对于以下输入:

+---+-----+------+--------+
|key|value|value2|    date|
+---+-----+------+--------+
|  1|    1|     1|20200101|
|  1|    8|     1|20200102|
|  1|    1|     1|20200103|
|  1|   10|     1|20200113|
|  1|    9|     1|20200120|
|  1|    6|     1|20200121|
|  1|    9|     1|20200122|
|  1|    9|     1|20210120|
|  1|    9|     1|20220121|
|  1|    9|     3|20230122|
|  2|    5|     1|20200101|
+---+-----+------+--------+

这个函数的缺点是建立了无限量的状态 - 对于每个键......但是当我计划将它应用于相当小的维度表时,我认为无论如何它应该没问题。

于 2020-10-09T19:24:54.157 回答