嗨,感谢您的宝贵时间
- 我正在尝试运行一个在“2021-06-08”和“2021-06-10”分区日期之间回填数据的进程。
- 每个分区需要 10 天的数据
- 我想每天为分区中的每个用户生成一行,只要他们在 10 天内有一行
- 这意味着我希望用户 uX 和 uZ 在第 8、第 9 和第 10 行
- 我希望最终输出为:
分区日期 | 用户 | item_arry.daydate | item_array.item | item_array.score |
---|---|---|---|---|
21 年 8 月 6 日 | uX | 21 年 3 月 6 日 | iA | 0.2 |
21 年 6 月 6 日 | IB | 0.9 | ||
21 年 8 月 6 日 | uZ | 21 年 7 月 6 日 | iA | 0.9 |
21 年 9 月 6 日 | uX | 21 年 3 月 6 日 | iA | 0.2 |
21 年 6 月 6 日 | IB | 0.9 | ||
21 年 9 月 6 日 | uZ | 21 年 7 月 6 日 | iA | 0.9 |
21 年 10 月 6 日 | uX | 21 年 3 月 6 日 | iA | 0.2 |
21 年 6 月 6 日 | IB | 0.9 | ||
21 年 10 月 6 日 | uZ | 21 年 7 月 6 日 | iA | 0.9 |
21 年 10 月 6 日 | IB | 0.8 |
我遇到的主要问题是在我做第二步的最后一步
array_agg
我真的希望能够在此步骤中进行重复数据删除
如果您能够想到一个非常好的方法来做到这一点,即使是一个完整的重构也将不胜感激
提前致谢 :)
CREATE TEMP FUNCTION get_missing_dates(daydate DATE, next_daydate DATE, max_days_forward INT64, to_date DATE) AS (
GENERATE_DATE_ARRAY(
daydate,
LEAST(
DATE_SUB(
IFNULL(
IF(DATE_DIFF(next_daydate, daydate, DAY) <= max_days_forward, next_daydate, NULL),
DATE_ADD(daydate, INTERVAL max_days_forward DAY)),
INTERVAL 1 DAY)
,to_date)
)
);
WITH
my_data AS (
--user x
SELECT DATE('2021-06-01') AS daydate, 'uX' AS user, 'iA' AS item, 0.1 AS score UNION ALL
SELECT DATE('2021-06-03') AS daydate, 'uX' AS user, 'iA' AS item, 0.2 AS score UNION ALL
SELECT DATE('2021-06-03') AS daydate, 'uX' AS user, 'iB' AS item, 0.3 AS score UNION ALL
SELECT DATE('2021-06-06') AS daydate, 'uX' AS user, 'iB' AS item, 0.9 AS score UNION ALL
-- user y
SELECT DATE('2021-06-01') AS daydate, 'uZ' AS user, 'iA' AS item, 0.4 AS score UNION ALL
SELECT DATE('2021-06-02') AS daydate, 'uZ' AS user, 'iA' AS item, 0.6 AS score UNION ALL
SELECT DATE('2021-06-07') AS daydate, 'uZ' AS user, 'iA' AS item, 0.9 AS score UNION ALL
SELECT DATE('2021-06-10') AS daydate, 'uZ' AS user, 'iB' AS item, 0.8 AS score
),
find_next_day_on AS (
SELECT
daydate,
user,
ARRAY_AGG(
STRUCT(
daydate,
item,
score)) AS item_array,
-- this produces a date indicating the next day the user was on
LEAD(daydate, 1) OVER (PARTITION BY user ORDER BY daydate ASC) AS next_daydate
FROM my_data
-- this is the partition dates i would like to run for, with partitions requiring 10 days of data
WHERE daydate BETWEEN DATE_SUB('2021-06-08', INTERVAL 10 DAY) AND '2021-06-10'
GROUP BY 1, 2
),
broadcast_missing_days AS (
SELECT
missing_days AS new_daydate,
user,
item_array
FROM find_next_day_on,
UNNEST(get_missing_dates(daydate,next_daydate,10,DATE('2021-06-10'))) AS missing_days
),
build_10day_array AS (
SELECT
new_daydate,
user,
-- ??? IS THERE A WAY TO DEDUPE THIS ARRAY IN THIS STEP SO I GET THE LATEST SCORE FOR EACH ITEM???
ARRAY_AGG(
STRUCT(
daydate,
item,
score
)) OVER (PARTITION BY user ORDER BY UNIX_DATE(new_daydate) RANGE BETWEEN 9 PRECEDING AND 0 PRECEDING)
FROM broadcast_missing_days,
UNNEST(item_array) AS ia
)
SELECT * FROM build_10day_array ORDER BY user, new_daydate