0

我正在将数据加载到雪花数据保险库建模数据库中。当行的字段已更新时,模型的工作方式如下:

  1. 将此行的加载结束日期设置为等于current_timestamp()
  2. 将具有新值的同一行再次添加到模型中。

merge在 JavaScript 过程中使用来自 Snowflake 的命令来执行此操作:

var observarion_query = "MERGE INTO HUB_OBSERVATION AS OBS "+
"USING (SELECT DATE(T.$"+OBSERVATION_DATE+", 'DD/MM/YYYY') AS OBS_DATE, T.$"+LOCATIONS+", T.$"+SUBMISSION_TIME+" FROM "+FILE_FULL_PATH+"(FILE_FORMAT=>"+FILE_FORMAT_NAME+") T) ST "+
"ON md5(CONCAT(ST.OBS_DATE, CONCAT('CAMP', CONCAT(ST.$"+LOCATION_POSITION+", ST.$"+SUBMISSION_TIME+")))) = OBS.OBSERVATION_DATE_LOCATION_HASH_KEY "+
"WHEN MATCHED THEN UPDATE SET OBS.LOAD_END_DT = CURRENT_TIMESTAMP() "+
"WHEN NOT MATCHED THEN "+
"INSERT (OBSERVATION_DATE_LOCATION_HASH_KEY, LOAD_DT, LOAD_END_DT, RECORD_SRC, OBSERVATION_DATE, LOCATION_NAME) "+
"VALUES (md5(CONCAT(ST.OBS_DATE, CONCAT('CAMP', CONCAT(ST.$"+LOCATION_POSITION+", ST.$"+SUBMISSION_TIME+")))), current_timestamp(), NULL, 'ONA', ST.OBS_DATE, CONCAT('CAMP', ST.$"+LOCATION_POSITION+")) ";

问题出在 内WHEN MATCHED THEN,我需要用它的新值对同一行进行插入,但有额外的条件说:

WHEN MATCHED and OBS.REVIEW_STATUS <> ST.REVIEW_STATUS THEN
// INSERT THE ROW

而且我真的知道我们不能在WHEN MATCHED THEN语句中执行插入查询。

我们怎样才能找到转机呢?

4

2 回答 2

4

如果我正确理解您的问题,您希望单个源行可能对目标表造成 2 项操作:

  • 将 LOAD_END_DT 更新为当前时间戳(如果键已存在于目标表中且 LOAD_END_DT = NULL,则表示它是“当前”)
  • 使用最新信息在目标表中插入新行

您可以通过使用 UNION ALL 将 USING 子句中的每个源行“拆分”为 2 行来实现此目的:一行用于 UPDATE,一行用于 INSERT。我通常包含一个布尔标志来区分它们(因为它们是重复的)。在 INSERT 部分,我执行SELECT ... WHERE NOT EXISTS (SELECT 1 FROM target WHERE key = key and MD5() = MD5())以便仅当新行与当前行。我的 ON 子句在表示 UPDATE 场景的布尔值上有一个过滤器。

[编辑以包含示例 MERGE]

首先,假设以下阶段和最终表定义:

CREATE OR REPLACE TRANSIENT TABLE T_STAGE (
  ID            INTEGER
 ,COL1          VARCHAR
 ,COL2          VARCHAR
 ,COL3          VARCHAR
)
;
CREATE OR REPLACE TRANSIENT TABLE T_FINAL (
  ID            INTEGER
 ,START_TS      TIMESTAMP_LTZ
 ,END_TS        TIMESTAMP_LTZ
 ,COL1          VARCHAR
 ,COL2          VARCHAR
 ,COL3          VARCHAR
 ,COL_MD5_HASH  VARCHAR
)
;

此 MERGE 说明了使用 UNION ALL 将单个源行拆分为 2,以便可以对目标表应用 INSERT 和 UPDATE:

MERGE INTO T_FINAL AS TGT
  USING (
    WITH CTE_X AS (
      SELECT ID
            ,COL1
            ,COL2
            ,COL3
            ,MD5(ARRAY_TO_STRING(ARRAY_CONSTRUCT(ID, COL1, COL2, COL3), '^')) AS COL_MD5_HASH
        FROM T_STAGE
    )
    SELECT FALSE AS UPDATE_FLAG
          ,X.ID
          ,X.COL1
          ,X.COL2
          ,X.COL3
          ,X.COL_MD5_HASH
      FROM CTE_X X
     WHERE NOT EXISTS (
             SELECT 1
               FROM T_FINAL T2
              WHERE T2.COL_MD5_HASH = X.COL_MD5_HASH
           )
    UNION ALL
    SELECT TRUE AS UPDATE_FLAG
          ,X.ID
          ,X.COL1
          ,X.COL2
          ,X.COL3
          ,X.COL_MD5_HASH
      FROM CTE_X X
           JOIN T_FINAL T3
             ON T3.END_TS IS NULL
            AND T3.ID = X.ID
            AND T3.COL_MD5_HASH != X.COL_MD5_HASH
  ) AS SRC
  ON TGT.END_TS IS NULL
 AND SRC.ID = TGT.ID
 AND SRC.UPDATE_FLAG
 WHEN NOT MATCHED THEN INSERT (ID, START_TS, END_TS, COL1, COL2, COL3, COL_MD5_HASH)
   VALUES (SRC.ID, CURRENT_TIMESTAMP(), NULL, SRC.COL1, SRC.COL2, SRC.COL3, SRC.COL_MD5_HASH)
 WHEN MATCHED THEN UPDATE SET END_TS = CURRENT_TIMESTAMP()
;

根据您的规格,有许多假设和变化。例如,如果在每次 MERGE 后清除阶段表行,则可以删除 NOT EXISTS ......它只是为了避免多次插入同一阶段行。您必须进行调整以匹配您的规格。这仅用于说明目的(因为您提出了要求)。

于 2021-01-21T02:37:48.973 回答
1

逻辑(或周转)可以是:

  1. 为更新的行设置 load_end_dates 并插入全新的行:

    合并 ... 匹配时更新 不匹配时插入

  2. 使用新的 validfrom/validtos 插入更新的行,例如:

    INSERT INTO target_table JOIN source_table ON target_table.key = source_table.key WHERE target_table.col1<>source_table.col1, ...(因此您必须在此处识别更新的行)

如果要插入更新的行,则必须识别它们。这是通过您的 target_table 和 source_table 之间的比较来完成的。如何识别更新的行:

  1. 使用表的键加入target_table和source_table(如果你想通过业务键加入)
  2. 过滤列已更改的那些行 --> 添加一个 WHERE 子句来检查诸如 source.colA != target.colA 或 source.colB != target.colB 之类的内容
  3. 将此选择语句的结果插入到您的 Satellite

另一个提示可能是使用 Changed hash key 并检查是否有不同的 change hash 用于业务实体的最新记录,请参见此处:https ://www.hansmichiels.com/2016/04/09/hash-diff -计算-with-sql-server-datavault-series/

于 2021-01-20T09:10:39.957 回答