0

处理需要我从一个 postgres 表中提取数据并将数据更新到另一个环境中的另一个 Postgres 表的 ETL 过程(相同的列名)。目前,我正在 Windows EC2 实例中运行 python 作业,并且我正在使用pangres upsert 库来更新现有行并插入新行。

但是,我的组织希望我在 AWS 上的托管 Apache Airflow 中移动 python ETL 脚本。

我一直在学习 DAG,大部分教程和文章都是关于使用钩子或运算符从 postgres 表中查询数据。

但是,我希望了解如何使用表 B 中的新记录增量更新现有表 A(即 upsert)(并忽略/覆盖现有的匹配行)。

任何解释如何执行这个简单任务的代码块 (DAG) 都会非常有帮助。

4

1 回答 1

0

在 Apache Airflow 中,操作是使用运算符完成的。您可以将任何 Python 代码打包到运算符中,但最好的办法是始终使用预先存在的开源运算符(如果已经存在)。Postgres 有一个运算符(https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators/postgres_operator_howto_guide.html)。

很难提供一个完整的例子来说明你应该为你的情况编写什么,但听起来你在这里采取的最佳方法是获取 Python ETL 脚本中存在的任何 SQL 并将其与 Postgres 运算符一起使用. 我链接的文档应该是一个很好的例子。

他们演示了插入数据、读取数据,甚至创建表作为先决条件。就像在 Python 脚本中,行一次执行一个,在 DAG 中,运算符以特定顺序执行,具体取决于它们的连接方式,例如在他们的示例中:

create_pet_table >> populate_pet_table >> get_all_pets >> get_birth_date

在他们的示例中,在创建宠物表步骤成功之前不会填充宠物表,等等。

由于您的用例是将新数据从一个表复制到另一个表,因此我可以给您一些提示:

  • 使用计划的 DAG 批量复制数据。Airflow 并不打算用于许多小数据的流式传输系统。
  • 在 DAG 中使用 DAG 运行 ( https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html )的“逻辑日期”来了解运行应处理的数据间隔。这非常适合您的要求,即在每次运行期间只应复制新数据。如果您需要修复代码,它还会为您提供可重复的运行,然后在推动修复后重新运行每次运行(一次一批)。
于 2021-12-19T22:49:28.880 回答