我正在调查从 Kafka -> SnowFlake/Kafka 连接器 -> SnowFlake 获取数据。不幸的是,连接器似乎只使用了两列(并将整个 JSON 有效负载放在一列中)。所以我创建了一个流/任务来定期将数据从登陆表复制到目标表(使用插入)。一切都运行良好,除了一旦登陆目标表就删除登陆表中的数据。使用流,我知道什么已经登陆。如何删除其余数据?截断似乎要快得多。我是否只是定期运行删除这些条目的删除任务?我还担心执行这些删除的仓库时间。谢谢
问问题
163 次
1 回答
0
对于多个语句(如插入、删除等)访问相同更改记录的用例,将它们包围在显式事务语句(Begin..Commit)中,这将锁定流。
您可以有一个附加列,如标志,使用 Begin 锁定流,使用流从暂存中插入到目标表,使用流执行第二次合并到暂存表以标记列标志。
https://docs.snowflake.com/en/user-guide/streams.html#examples
begin;
select * from <stream>;
insert into target_table select columns from <stream> where metadata$action='INSERT' and flag=0;
merge into staging_table st
using (
select column
from stream
where flag = 0) sc
on st.column=sc.column
when matched then update set st.flag=1;
commit;
delete from staging_table where flag=1;
于 2020-05-12T17:16:07.833 回答