1

我需要使用经典的平面表在 BigQuery 中执行合并语句,将具有嵌套和重复字段的表作为目标,但我无法理解它应该如何工作。Google 的示例使用直接值,所以这里的语法对我来说不是很清楚。

使用这个例子:

CREATE OR REPLACE TABLE
  mydataset.DIM_PERSONA (
    IdPersona STRING,
    Status STRING,
    Properties ARRAY<STRUCT<
      Id STRING,
      Value STRING,
      _loadingDate TIMESTAMP,
      _lastModifiedDate TIMESTAMP
    >>,
    _loadingDate TIMESTAMP NOT NULL,
    _lastModifiedDate TIMESTAMP
);

INSERT INTO mydataset.DIM_PERSONA
values
  ('A', 'KO', [('FamilyMembers', '2', CURRENT_TIMESTAMP(), TIMESTAMP(NULL))], CURRENT_TIMESTAMP(), TIMESTAMP(NULL)),
  ('B', 'KO', [('FamilyMembers', '4', CURRENT_TIMESTAMP(), TIMESTAMP(NULL)),('Pets', '1', CURRENT_TIMESTAMP(), NULL)], CURRENT_TIMESTAMP(), TIMESTAMP(NULL))
;

CREATE OR REPLACE TABLE
  mydataset.PERSONA (
    IdPersona STRING,
    Status STRING,
    IdProperty STRING,
    Value STRING
);

INSERT INTO mydataset.PERSONA
VALUES('A', 'OK','Pets','3'),('B', 'OK','FamilyMembers','5'),('C', 'OK','Pets','2')

目标是:

  1. 更新 IdPersona='A',在 Properties 中添加一个新元素并更改 Status
  2. 更新 IdPersona='B',更新 Properties 中的现有元素
  3. 插入 IdPersona='C'

此插入有效:

MERGE INTO mydataset.DIM_PERSONA TRG
USING (
  SELECT
    IdPersona,
    Status,
    ARRAY(
      SELECT AS STRUCT
        IdProperty,
        Value,
        CURRENT_TIMESTAMP(), 
        TIMESTAMP(NULL)
    ) Properties, 
    CURRENT_TIMESTAMP(),
    TIMESTAMP(NULL)
  FROM mydataset.PERSONA
) SRC ON TRG.IdPersona=SRC.IdPersona
WHEN NOT MATCHED THEN
INSERT VALUES (IdPersona, Status, Properties, CURRENT_TIMESTAMP(), TIMESTAMP(NULL))

但我想在 INSERT 子句中构建嵌套/重复字段,因为对于 UPDATE,我还需要(我认为)通过将 TRG 的值与 SRC 进行比较来执行“SELECT AS STRUCT * REPLACE”。这不起作用:

MERGE INTO mydataset.DIM_PERSONA TRG
USING (
  SELECT
    *
  FROM mydataset.PERSONA
) SRC ON TRG.IdPersona=SRC.IdPersona
WHEN NOT MATCHED THEN
INSERT VALUES (
  IdPersona,
  Status,
  ARRAY(
    SELECT AS STRUCT
      IdProperty,
      Value,
      CURRENT_TIMESTAMP(), 
      TIMESTAMP(NULL)
  ),
  CURRENT_TIMESTAMP(),
  TIMESTAMP(NULL)
)

我得到“INSERT 子句不支持相关子查询。”

即使我使用了第一个选项,我也不知道如何在更新中引用 TRG.properties ..

WHEN MATCHED THEN
UPDATE
SET Properties = ARRAY(
  SELECT AS STRUCT p_SRC.*
    REPLACE (IF(p_SRC.IdProperty=p_TRG.id AND p_SRC.Value<>p_TRG.Value,p_SRC.Value,p_TRG.Value) AS Value)
  FROM SRC.Properties p_SRC, TRG.Properties p_TRG
)

Obv 这是错误的。

在我看来,解决这个问题的一种方法是预先加入 USING 子句中的所有内容,因此在那里进行所有替换,但对于合并语句来说感觉非常错误。

谁能帮我解决这个问题,好吗?:\

4

2 回答 2

0

所以,我想分享一个可能的解决方案,尽管我仍然希望有另一种方法。如前所述,我使用 CTE 和 FULL OUTER JOIN 预先计算了我需要的内容,因此重新创建了我稍后需要的结构数组(表会相对较小,所以我负担得起)。

MERGE INTO mydataset.DIM_PERSONA TRG
USING (
  WITH NEW_PROPERTIES AS (
    SELECT
      COALESCE(idp,IdPersona) IdPersona,
      ARRAY_AGG((
        SELECT AS STRUCT
          COALESCE(idpro,Id) IdProperty,
          COALESCE(vl,Value) Value,
          COALESCE(_loadingDate,CURRENT_TIMESTAMP) _loadingDate,
          IF(idp=IdPersona,CURRENT_TIMESTAMP,TIMESTAMP(NULL)) _lastModifiedDate
      )) Properties
    FROM (
      SELECT DIP.IdPersona, DIP.Status, DIP_PR.*, PER.IdPersona idp, PER.Status st, PER.IdProperty idpro, PER.Value vl
      FROM `clean-yew-281811.mydataset.DIM_PERSONA` DIP
      CROSS JOIN UNNEST(DIP.Properties) DIP_PR
      FULL OUTER JOIN mydataset.PERSONA PER
        ON  DIP.IdPersona=PER.IdPersona
        AND DIP_PR.Id=PER.IdProperty 
    )
    GROUP BY IdPersona
  )
  
  SELECT
    IdPersona,
    'subquery to do here' Status,
    NP.Properties
  FROM (SELECT DISTINCT IdPersona FROM mydataset.PERSONA) PE
  LEFT JOIN NEW_PROPERTIES NP USING (IdPersona)
) SRC ON TRG.IdPersona=SRC.IdPersona
WHEN NOT MATCHED THEN
INSERT VALUES (IdPersona, Status, Properties, CURRENT_TIMESTAMP(), TIMESTAMP(NULL))
WHEN MATCHED THEN
UPDATE
SET
  TRG.Status = SRC.Status,
  TRG.Properties = SRC.Properties,
  TRG._lastModifiedDate = CURRENT_TIMESTAMP()

这可行,但我几乎避免使用更新结构数组的语法,因为我正在做的是重建和替换操作。希望有人可以提出更好的方法。

于 2020-08-15T15:55:08.867 回答
0

此外,虽然您没有提供所需的输出,但我能够根据您描述的目标和代码以及您提供的示例数据创建查询。

遵循以下目标:

  1. 更新 IdPersona='A',在 Properties 中添加一个新元素并更改 Status
  2. 更新 IdPersona='B',更新 Properties 中的现有元素
  3. 插入 IdPersona='C'

我没有进行替换和重建操作,而是使用:

  • MERGE ;为了执行更新并插入新行,例如 IdPersona = "C"
  • INSERT:在合并中不能将 INSERT 与 WHEN MATCHED 一起使用。因此,为了在 IdPerson="A" 时添加新属性,在 MERGE 操作之后使用此方法。
  • CREATE TABLE : 使用 INSERT 后,IdPersona="A" 时的新属性不会聚合,因为我们没有使用 WHEN MATCHED。因此,最终表 DM_PERSONA 被替换以正确聚合结果。
  • LEFT JOIN:为了添加字段_loadingDate和 *_lastModifiedDate *,它们不会聚合到ARRAY<STRUCT<>>中。

以下是带有正确注释的查询:

#first step update current values and insert new IdPersonas
MERGE sample.DIM_PERSONA_test2 T
USING sample.PERSONA_test2 S
ON T.IdPersona = S.IdPersona

#update A but not insert
WHEN MATCHED AND T.IdPersona ="A" THEN
UPDATE SET STATUS = "OK"

#update B
WHEN  MATCHED AND T.IdPersona ="B" THEN
UPDATE SET Properties = [( S.IdPersona, S.IdProperty,TIMESTAMP(NULL), TIMESTAMP(NULL) )]

#insert what is not in the target table
WHEN NOT MATCHED THEN
INSERT(IdPersona, Status , Properties, _loadingDate, _lastModifiedDate ) VALUES (S.IdPersona, S.Status, [( IdProperty,Value, TIMESTAMP(NULL), TIMESTAMP(NULL))], CURRENT_TIMESTAMP(), TIMESTAMP(NULL));

#insert new values when IdPersona="A"
#you will see the result won't be aggregated properly
INSERT INTO sample.DIM_PERSONA_test2(IdPersona, Status , Properties, _loadingDate, _lastModifiedDate)
SELECT IdPersona, Status,[( IdProperty,Value, TIMESTAMP(NULL), TIMESTAMP(NULL))], CURRENT_TIMESTAMP(), TIMESTAMP(NULL) from sample.PERSONA_test2
where IdPersona = "A";

#replace the above table to recriate the ARRAY<STRUCT<>>
CREATE OR REPLACE TABLE sample.DIM_PERSONA_FINAL_test2 AS(
SELECT t1.*, t2._loadingDate,t2._lastModifiedDate 
FROM( SELECT a.IdPersona, 
       a.Status, 
       ARRAY_AGG(STRUCT( Properties.Id as Id, Properties.Value as Value, Properties._loadingDate ,
Properties._lastModifiedDate AS _lastModifiedDate)) AS Properties
FROM sample.DIM_PERSONA_test2 a, UNNEST(Properties) as Properties
GROUP BY 1,2
ORDER BY a.IdPersona)t1 LEFT JOIN sample.DIM_PERSONA_test2 t2 USING(IdPersona)
)

而输出,

在此处输入图像描述

请注意,在更新ARRAY<STRUCT<>>时,值被包装在[()]中。最后,注意有两个IdPersona="A"因为_loadingDate是必需的,所以它不能为NULL并且由于CURRENT_TIMESTAMP( ),这个字段有两个不同的值。因此,两个不同的记录。

于 2020-08-19T08:36:05.973 回答