0

问题

主要问题

如何从每日提取的文件夹中临时实现缓慢变化的维度类型 2,其中每个 csv 都是来自源系统的表的完整提取?

理由

我们正在设计临时数据仓库作为最终用户的数据集市,可以在没有任何后果的情况下启动和烧毁。这要求我们在一个湖/块/桶中拥有所有数据。

我们每天都在提取完整的提取物,因为:

  1. 我们不能可靠地仅提取变更集(出于我们无法控制的原因),并且
  2. 我们想维护一个包含“最原始”数据的数据湖。

挑战问题

有没有一种解决方案可以给我一个特定日期的状态,而不仅仅是“最新”状态?

存在问题

我是否完全倒退了这一点,并且有更简单的方法可以做到这一点?

可能的方法

自定义dbt物化

包中有一个insert_by_perioddbt 实现dbt.utils,我认为这可能正是我正在寻找的?但是我很困惑dbt snapshot,但是:

  1. dbt snapshot一次增量地为每个文件运行;和,
  2. 直接从外部表构建?

三角洲湖

我对 Databricks 的 Delta Lake 了解不多,但看起来应该可以使用Delta Tables吗?

修复提取作业

如果我们可以让我们的提取物只包含自上次提取物以来发生的变化,我们的问题就解决了吗?

例子

假设以下三个文件位于数据湖的文件夹中。(要点 3 个 csvs 和所需的表格结果为 csv)。我添加了 Extracted 列,以防从文件名中解析时间戳太棘手。

2020-09-14_CRM_extract.csv

| OppId | CustId | Stage       | Won | LastModified | Extracted |
|-------|--------|-------------|-----|--------------|-----------|
| 1     | A      | 2 - Qualify |     | 9/1          | 9/14      |
| 2     | B      | 3 - Propose |     | 9/12         | 9/14      |

2020-09-15_CRM_extract.csv

| OppId | CustId | Stage       | Won | LastModified | Extracted |
|-------|--------|-------------|-----|--------------|-----------|
| 1     | A      | 2 - Qualify |     | 9/1          | 9/15      |
| 2     | B      | 4 - Closed  | Y   | 9/14         | 9/15      |
| 3     | C      | 1 - Lead    |     | 9/14         | 9/15      |

2020-09-16_CRM_extract.csv

| OppId | CustId | Stage       | Won | LastModified | Extracted |
|-------|--------|-------------|-----|--------------|-----------|
| 1     | A      | 2 - Qualify |     | 9/1          | 9/16      |
| 2     | B      | 4 - Closed  | Y   | 9/14         | 9/16      |
| 3     | C      | 2 - Qualify |     | 9/15         | 9/16      |

最终结果

以下是截至 9 月 16 日的三个文件的 SCD-II。9/15 的 SCD-II 将是相同的,但OppId=3只有一个来自valid_from=9/15valid_to=null

| OppId | CustId | Stage       | Won | LastModified | valid_from | valid_to |
|-------|--------|-------------|-----|--------------|------------|----------|
| 1     | A      | 2 - Qualify |     | 9/1          | 9/14       | null     |
| 2     | B      | 3 - Propose |     | 9/12         | 9/14       | 9/15     |
| 2     | B      | 4 - Closed  | Y   | 9/14         | 9/15       | null     |
| 3     | C      | 1 - Lead    |     | 9/14         | 9/15       | 9/16     |
| 3     | C      | 2 - Qualify |     | 9/15         | 9/16       | null     |
4

2 回答 2

2

有趣的概念,当然,要完全了解您的业务、利益相关者、数据等,这将比在此论坛上进行更长的对话。我可以看到,如果您的数据量相对较小,它可能会起作用,您的源系统很少更改,您的报告要求(以及因此的数据集市)也很少发生变化,您只需很少启动这些数据集市。

我的担忧是:

  1. 如果您的源或目标需求发生变化,您将如何处理?您将需要启动您的数据集市,对其进行全面的回归测试,应用您的更改,然后对其进行测试。如果您在更改已知时执行此操作,那么对于未使用的 Datamart 来说需要付出很多努力 - 特别是如果您需要在两次使用之间多次执行此操作;如果您在需要数据集市时执行此操作,那么您将无法实现让数据集市可“即时”使用的目标。

您的陈述“我们有一个 DW 作为代码,可以删除、更新和重新创建,而不会像传统 DW 变更管理那样复杂”,我不确定是否属实。在不启动数据集市并通过数据的标准测试周期的情况下,您将如何测试代码更新 - 那么这与传统的 DW 变更管理有何不同?

  1. 如果源系统中存在损坏/意外数据会怎样?在您每天加载数据的“正常” DW 中,这通常会在当天被注意到并修复。在您的解决方案中,可疑数据可能在数天/数周前发生,并且假设它已加载到您的数据集市而不是在加载时出错,您将需要适当的流程来发现它,然后可能必须解开数天的 SCD 记录以解决问题
  2. (仅当您拥有大量数据时才相关)鉴于存储成本低,我不确定我是否看到了在需要时启动数据集市而不是仅仅保存数据以供使用的好处。每次启动数据集市时加载大量数据将既耗时又昂贵。可能的混合方法可能是仅在需要数据集市时才运行增量加载,而不是每天运行它们——因此您可以随时准备好上次使用数据集市时的数据,并且您只需添加创建/更新的记录最后的负载
于 2020-09-18T09:59:58.600 回答
1

我不知道这是否是最好的,但我已经看到它完成了。当您构建初始 SCD-II 表时,添加一个列,该列是记录HASH()的所有值的存储值(您可以排除主键)。然后,您可以每天在传入的完整数据集上创建一个外部表,其中包含相同的HASH()功能。现在,您可以根据主键以及 HASH 值是否已更改来执行MERGE或针对您的 SCD-II。INSERT/UPDATE

这样做的主要优势是避免每天将所有数据加载到 Snowflake 中进行比较,但以这种方式执行会更慢。HASH()您还可以使用语句中包含的函数加载到临时表,COPY INTO然后更新您的 SCD-II,然后删除临时表,这实际上可能更快。

于 2020-09-17T02:58:19.460 回答