2

我有一个迭代输入并将数据吐出到我已配置为上传到我创建的红移表的 AWS Firehose 的过程。一个问题是,有时行可能会重复,因为该过程需要重新评估数据。就像是:

Event_date, event_id, event_cost
2015-06-25, 123, 3
2015-06-25, 123, 4

http://docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html

看那里,我想用新值替换旧行,比如:

insert into event_table_staging  
select event_date,event_id, event_cost from <s3 location>;

delete from event_table  
using event_table_staging  
where event_table.event_id = event_table_staging.event_id;

insert into target 
select * from event_table_staging;

delete from event_table_staging  
select * from event_table_staging;

是否可以执行以下操作:

Redshift columns: event_date,event_id,cost
copy event_table from <s3> 
(update event_table 
select c_source.event_date,c_source.event_id,c_source.cost from <s3 source> as c_source join event_table on c_source.event_id = event_table.event_id) 
CSV


copy event_table from <s3> 
(insert into event_table 
select c_source.event_date,c_source.event_id,c_source.cost from event_table left outer join<s3 source> as c_source join on c_source.event_id = event_table.event_id where c_source.event_id is NULL) 
CSV
4

2 回答 2

3

您不能直接从 COPY 进行合并。

但是,您的初始方法可以使用临时表包装在事务中,以暂存负载数据以获得最佳性能。

BEGIN
;
CREATE TEMP TABLE event_table_staging (
     event_date  TIMESTAMP  NULL
    ,event_id    BIGINT     NULL
    ,event_cost  INTEGER    NULL )
DISTSTYLE KEY
DISTKEY (event_id)
SORTKEY (event_id)
;
COPY event_table_staging  
FROM <s3 location>
COMPUDATE ON
;
UPDATE event_table  
SET    event_date = new.event_date
      ,event_cost = new.event_cost
FROM        event_table         AS trg
INNER JOIN  event_table_staging AS new
        ON  trg.event_id = new.event_id
WHERE COALESCE(trg.event_date,0) <> COALESCE(new.event_date,0)
  AND COALESCE(trg.event_cost,0) <> COALESCE(new.event_cost,0)
;
INSERT INTO event_table 
SELECT  event_date
       ,event_id  
       ,event_cost
FROM        event_table_staging AS new
LEFT JOIN   event_table         AS trg
       ON   trg.event_id = new.event_id
WHERE trg.event_id IS NULL
;
COMMIT
;

只要您使用事务并且更新总量相对较低(个位数百分比),这种方法实际上表现得非常好。唯一需要注意的是,您的目标需要VACUUM定期进行编辑 - 每月一次对我们来说就足够了。

我们每小时对几亿行范围内的几张表执行此操作,即将亿行合并成亿行。对合并表的用户查询仍然表现良好。

于 2016-02-27T15:19:30.127 回答
2

Redshift 经过优化,可以以经济高效的方式处理海量数据,您需要改变对其他数据库的数据和数据库的一些想法。

主要概念是您不应该在 Redshift 中更新数据。您应该将 Redshift 中的数据视为“日志”。您可以将函数用作 INSERT 或 UPDATE,但它们会极大地限制您可以处理的数据量。

您可以通过多种方式处理重复项:

  • 您可以通过管理您正在处理的所有 ID 的一些内存查找表(例如在ElastiCache上的 Redis 中)来防止写入重复,如果您已经处理了记录,则忽略它

  • 您可以在 Redshift 中保留重复项,并使用WINDOW函数处理这些记录,该函数将只获取其中一条记录(例如LAST_VALUE)。

  • 您可以在 Redshift 中获取原始事件并在对数据库的查询中进行聚合,而不是将其作为预处理进行。此模式还可以灵活地更改聚合数据的方式。通过这些聚合,Redshift 可以非常快,并且几乎不需要预聚合。

如果您仍想在 Redshift 中拥有“干净”和聚合的数据,您可以使用具有正确聚合或 WINDOW 函数的一些 SQL 查询来卸载该数据,删除旧表并将数据复制回 Redshift。

于 2016-02-28T02:54:13.563 回答