0

嗨,感谢您的宝贵时间

  • 我正在尝试运行一个在“2021-06-08”和“2021-06-10”分区日期之间回填数据的进程。
  • 每个分区需要 10 天的数据
  • 我想每天为分区中的每个用户生成一行,只要他们在 10 天内有一行
  • 这意味着我希望用户 uX 和 uZ 在第 8、第 9 和第 10 行
  • 我希望最终输出为:
分区日期 用户 item_arry.daydate item_array.item item_array.score
21 年 8 月 6 日 uX 21 年 3 月 6 日 iA 0.2
21 年 6 月 6 日 IB 0.9
21 年 8 月 6 日 uZ 21 年 7 月 6 日 iA 0.9
21 年 9 月 6 日 uX 21 年 3 月 6 日 iA 0.2
21 年 6 月 6 日 IB 0.9
21 年 9 月 6 日 uZ 21 年 7 月 6 日 iA 0.9
21 年 10 月 6 日 uX 21 年 3 月 6 日 iA 0.2
21 年 6 月 6 日 IB 0.9
21 年 10 月 6 日 uZ 21 年 7 月 6 日 iA 0.9
21 年 10 月 6 日 IB 0.8
  • 我遇到的主要问题是在我做第二步的最后一步array_agg

  • 我真的希望能够在此步骤中进行重复数据删除

  • 如果您能够想到一个非常好的方法来做到这一点,即使是一个完整的重构也将不胜感激

  • 提前致谢 :)

CREATE TEMP FUNCTION get_missing_dates(daydate DATE, next_daydate DATE, max_days_forward INT64, to_date DATE) AS (
GENERATE_DATE_ARRAY(
  daydate,
  LEAST(
    DATE_SUB(
      IFNULL(
        IF(DATE_DIFF(next_daydate, daydate, DAY) <= max_days_forward, next_daydate, NULL),
        DATE_ADD(daydate, INTERVAL max_days_forward DAY)),
      INTERVAL 1 DAY)
    ,to_date)
  )
);


WITH 
my_data AS (
--user x
SELECT DATE('2021-06-01') AS daydate, 'uX' AS user, 'iA' AS item, 0.1 AS score UNION ALL
SELECT DATE('2021-06-03') AS daydate, 'uX' AS user, 'iA' AS item, 0.2 AS score UNION ALL
SELECT DATE('2021-06-03') AS daydate, 'uX' AS user, 'iB' AS item, 0.3 AS score UNION ALL
SELECT DATE('2021-06-06') AS daydate, 'uX' AS user, 'iB' AS item, 0.9 AS score UNION ALL
-- user y
SELECT DATE('2021-06-01') AS daydate, 'uZ' AS user, 'iA' AS item, 0.4 AS score UNION ALL
SELECT DATE('2021-06-02') AS daydate, 'uZ' AS user, 'iA' AS item, 0.6 AS score UNION ALL
SELECT DATE('2021-06-07') AS daydate, 'uZ' AS user, 'iA' AS item, 0.9 AS score UNION ALL
SELECT DATE('2021-06-10') AS daydate, 'uZ' AS user, 'iB' AS item, 0.8 AS score
),

find_next_day_on AS (
SELECT 
  daydate,
  user,
  ARRAY_AGG(
    STRUCT(
      daydate,
      item,
      score)) AS item_array,
  -- this produces a date indicating the next day the user was on
  LEAD(daydate, 1) OVER (PARTITION BY user ORDER BY daydate ASC) AS next_daydate
FROM my_data
-- this is the partition dates i would like to run for, with partitions requiring 10 days of data
WHERE daydate BETWEEN DATE_SUB('2021-06-08', INTERVAL 10 DAY) AND '2021-06-10'
GROUP BY 1, 2
),

broadcast_missing_days AS (
SELECT
  missing_days AS new_daydate,
  user,
  item_array
FROM find_next_day_on, 
  UNNEST(get_missing_dates(daydate,next_daydate,10,DATE('2021-06-10'))) AS missing_days
),
build_10day_array AS (
SELECT
  new_daydate,
  user,
  -- ??? IS THERE A WAY TO DEDUPE THIS ARRAY IN THIS STEP SO I GET THE LATEST SCORE FOR EACH ITEM???
  ARRAY_AGG(
    STRUCT(
    daydate,
    item,
    score
    )) OVER (PARTITION BY user ORDER BY UNIX_DATE(new_daydate) RANGE BETWEEN 9 PRECEDING AND 0 PRECEDING)
FROM broadcast_missing_days,
  UNNEST(item_array) AS ia
)
SELECT * FROM build_10day_array ORDER BY user, new_daydate
4

0 回答 0