我正在使用 PySpark 编写一个大批量作业,该作业对 200 个表进行 ETL 并加载到 Amazon Redshift 中。这 200 个表是从一个输入数据源创建的。因此,只有当数据成功加载到 ALL 200 个表中时,批处理作业才会成功。批处理作业每天运行,同时将每个日期的数据附加到表中。
对于容错性、可靠性和幂等性,我当前的工作流程如下:
- 使用临时表。创建临时 Redshift 表
CREATE TEMP TABLE LIKE <target_table>
- 将数据转换并加载到临时表中。
- 对 200 个其他表重复 1-2。
- 开始
BEGIN
交易。 - 使用将临时表数据复制到目标表中
INSERT INTO <taget_table> SELECT * FROM <staging_table>
END
交易DROP
所有临时表。
这样我可以保证如果第 3 步失败(这更有可能),我不必担心从原始表中删除部分数据。相反,我将简单地重新运行整个批处理作业,因为临时表在 JDBC 断开连接后被丢弃。
虽然它解决了大部分问题,但它并不优雅、不合时宜,而且会耗费额外的时间。我想如果 Spark 和/或 Redshift 提供标准工具来解决 ETL 世界中这个非常常见的问题。
谢谢