来自关系数据库的数据被加载到 spark 中——据说是每天,但实际上不是每天。此外,它是数据库的完整副本 - 没有增量加载。
为了轻松地将维度表与主要事件数据连接起来,我想要:
- 对其进行重复数据删除(即提高以后广播加入的可能性)
- 有 valid_to/valid_from 列,所以即使数据不是每天可用(不一致),它仍然可以很好地使用(从下游)
我正在使用 spark 3.0.1 并希望以 SCD2 样式转换现有数据 - 而不会丢失历史记录。
spark-shell
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
case class Foo (key:Int, value:Int, date:String)
val d = Seq(Foo(1, 1, "20200101"), Foo(1, 8, "20200102"), Foo(1, 9, "20200120"),Foo(1, 9, "20200121"),Foo(1, 9, "20200122"), Foo(1, 1, "20200103"), Foo(2, 5, "20200101"), Foo(1, 10, "20200113")).toDF
d.show
val windowDeduplication = Window.partitionBy("key", "value").orderBy("key", "date")
val windowPrimaryKey = Window.partitionBy("key").orderBy("key", "date")
val nextThing = lead("date", 1).over(windowPrimaryKey)
d.withColumn("date", to_date(col("date"), "yyyyMMdd")).withColumn("rank", rank().over(windowDeduplication)).filter(col("rank") === 1).drop("rank").withColumn("valid_to", nextThing).withColumn("valid_to", when(nextThing.isNotNull, date_sub(nextThing, 1)).otherwise(current_date)).withColumnRenamed("date", "valid_from").orderBy("key", "valid_from", "valid_to").show
结果是:
+---+-----+----------+----------+
|key|value|valid_from| valid_to|
+---+-----+----------+----------+
| 1| 1|2020-01-01|2020-01-01|
| 1| 8|2020-01-02|2020-01-12|
| 1| 10|2020-01-13|2020-01-19|
| 1| 9|2020-01-20|2020-10-09|
| 2| 5|2020-01-01|2020-10-09|
+---+-----+----------+----------+
这已经很不错了。然而:
| 1| 1|2020-01-03| 2|2020-01-12|
丢失了。即,稍后(中间更改之后)再次出现的任何值都将丢失。如何在不保持更大排名的情况下保持这一行,例如:
d.withColumn("date", to_date(col("date"), "yyyyMMdd")).withColumn("rank", rank().over(windowDeduplication)).withColumn("valid_to", nextThing).withColumn("valid_to",
when(nextThing.isNotNull, date_sub(nextThing, 1)).otherwise(current_date)).withColumnRenamed("date", "valid_from").orderBy("key", "valid_from", "valid_to").show
+---+-----+----------+----+----------+
|key|value|valid_from|rank| valid_to|
+---+-----+----------+----+----------+
| 1| 1|2020-01-01| 1|2020-01-01|
| 1| 8|2020-01-02| 1|2020-01-02|
| 1| 1|2020-01-03| 2|2020-01-12|
| 1| 10|2020-01-13| 1|2020-01-19|
| 1| 9|2020-01-20| 1|2020-01-20|
| 1| 9|2020-01-21| 2|2020-01-21|
| 1| 9|2020-01-22| 3|2020-10-09|
| 2| 5|2020-01-01| 1|2020-10-09|
+---+-----+----------+----+----------+
这绝对是不希望的
- 这个想法是删除重复项
- 但使用 valid_to、valid_from 保留对数据的任何历史更改
如何正确地将其转换为 SCD2 表示,即具有 valid_from、valid_to 但不丢弃中间状态?
注意:我不需要更新现有数据(合并、加入)。重新创建/覆盖它很好。
即在 Spark 中实现 SCD 类型 2似乎太复杂了。在不需要状态处理的情况下,有没有更好的方法?即我有来自数据库的每日完整副本的数据,并希望对其进行重复数据删除。