我正在使用胶水将 3 个数据集从 S3 加载到 Aurora Postgres Serverless。由于 spark 和 jdbc 不允许我将 json stings 直接转换为 jsonb 数据类型,因此我在 Postgres 端创建了 3 个“暂存”表。我有一个第四个表 - etl_log - 我已经附加了一个触发函数,见下文在写入时触发。
该函数基本上采用每个这些临时表中的任何内容并将其放在它打算所在的数据类型中的目标中,并为下一次 etl 加载截断临时表。ETL 日志跟踪写入的内容和时间。
到目前为止一切都很好。当任何一个临时表中有数据并被调用时,此函数将按预期工作。但是,如果不止一个表中有记录,那么它似乎表现出两种行为之一:
- 它将数据加载到其中一个目标表中,同时截断两个临时表或
- 即使只调用了一个,它也会从所有临时表加载数据。(第二个表可能正在加载,所有执行程序尚未完成,因此在暂存中留下了一些记录)。
如何在它们独立运行的函数中隔离这些不同的块。我不太担心第二种情况,我已经编写了一个脚本(在 etlside -Glue 上)来计算所有 3 个临时表并写入计数(即使在任何临时表中为零),但我真的很感激关于第一个场景
尽管暂存中有有效的[时间戳存在并且至少初始或联系ID存在]记录,但目标负载的问题没有发生,但函数执行时在具有以下代码的联系流表周围截断了暂存中心:
INSERT INTO contact_flow_logs (
id,
message,
timestamp,
initialcontactid,
contactflowmodule_type,
weeknum,
contactid
)
SELECT
cfl.id,
cfl.message::jsonb,
cfl.timestamp::timestamptz,
cfl.message::jsonb ->> 'InitialContactId' AS initialcontactid,
cfl.message::jsonb ->> 'ContactFlowModuleType' AS contactflowmodule_type,
date_part('week', cfl.timestamp::timestamptz) AS weeknum,
cfl.message::jsonb ->> 'ContactId' AS contactid
FROM contact_flow_logs_staging AS cfl
WHERE COALESCE(cfl.message::jsonb ->> 'InitialContactId', cfl.message::jsonb ->> 'ContactId') IS NOT NULL AND cfl.timestamp is not null;
请参阅下面的完整功能。
提前致谢。
BEGIN
INSERT INTO agents (
currentagentsnapshot,
previousagentsnapshot,
agentarn,
eventid,
eventtimestamp,
eventtype,
instancearn)
SELECT
agent_stg.currentagentsnapshot::jsonb AS currentagentsnapshot,
agent_stg.previousagentsnapshot::jsonb AS previousagentsnapshot,
agent_stg.agentarn::text AS agentarn,
agent_stg.eventid::text AS eventid,
agent_stg.eventtimestamp::timestamptz AS eventtimestamp,
agent_stg.eventtype::text AS eventtype,
agent_stg.instancearn::text AS instancearn
FROM agents_staging AS agent_stg
WHERE agent_stg.agentarn IS NOT NULL;
TRUNCATE agents_staging;
INSERT INTO contact_flow_logs (
id,
message,
timestamp,
initialcontactid,
contactflowmodule_type,
weeknum,
contactid
)
SELECT
cfl.id,
cfl.message::jsonb,
cfl.timestamp::timestamptz,
cfl.message::jsonb ->> 'InitialContactId' AS initialcontactid,
cfl.message::jsonb ->> 'ContactFlowModuleType' AS contactflowmodule_type,
date_part('week', cfl.timestamp::timestamptz) AS weeknum,
cfl.message::jsonb ->> 'ContactId' AS contactid
FROM contact_flow_logs_staging AS cfl
WHERE COALESCE(cfl.message::jsonb ->> 'InitialContactId', cfl.message::jsonb ->> 'ContactId') IS NOT NULL AND cfl.timestamp is not null;
TRUNCATE contact_flow_logs_staging;
INSERT INTO contact_trace_records (
agent,
agentconnectionattempts,
attributes,
channel,
connectedtosystemtimestamp,
contactid,
customerendpoint,
disconnecttimestamp,
initialcontactid,
initiationmethod,
initiationtimestamp,
instancearn,
lastupdatetimestamp,
nextcontactid,
previouscontactid,
ctrqueue,
recording,
systemendpoint,
transfercompletedtimestamp,
transferredtoendpoint,
attemptsmade,
arrivaltimestamp,
attemptendingtimestamp,
contactdetails,
ctrreferences,
disconnectreason
)
SELECT
ctrs.agent::jsonb,
ctrs.agentconnectionattempts::TEXT,
ctrs.attributes::jsonb,
ctrs.channel::TEXT,
ctrs.connectedtosystemtimestamp::TIMESTAMPTZ,
ctrs.contactid::TEXT,
ctrs.customerendpoint::jsonb,
ctrs.disconnecttimestamp::TIMESTAMPTZ,
ctrs.initialcontactid::TEXT,
ctrs.initiationmethod::TEXT,
ctrs.initiationtimestamp::TIMESTAMPTZ,
ctrs.instancearn::TEXT,
ctrs.lastupdatetimestamp::TIMESTAMPTZ,
ctrs.nextcontactid::TEXT,
ctrs.previouscontactid::TEXT,
ctrs.ctrqueue::jsonb,
ctrs.recording::jsonb,
ctrs.systemendpoint::jsonb,
ctrs.transfercompletedtimestamp::TIMESTAMPTZ,
ctrs.transferredtoendpoint::jsonb,
ctrs.attemptsmade::TEXT,
ctrs.arrivaltimestamp::TIMESTAMPTZ,
ctrs.attemptendingtimestamp::TIMESTAMPTZ,
ctrs.contactdetails::TEXT,
ctrs.ctrreferences::TEXT,
ctrs.disconnectreason::TEXT
FROM contact_trace_records_staging AS ctrs
WHERE NOT (ctrs.contactid IS NULL AND ctrs.initialcontactid IS NULL);
TRUNCATE contact_trace_records_staging;
RETURN new;
结尾