我正在尝试在 AWS MWAA 上构建 DAG,此 DAG 会将数据从 Postgres (RDS) 导出到 S3,但是一旦 MWAA 尝试解析我的任务中的所有查询,它就会出现问题,总共它将导出 385 个表,但 DAG 卡在运行模式下,无法启动我的任务。
基本上,这个过程将:
- 加载表架构
- 重命名某些列
- 将数据导出到 S3
功能
def export_to_s3(dag, conn, db, pg_hook, export_date, s3_bucket, schemas):
tasks = []
run_queries = []
for schema, features in schemas.items():
t = features.get("tables")
if t:
tables = t
else:
tables = helper.get_tables(pg_hook, schema).table_name.tolist()
is_full_export = features.get("full")
for table in tables:
columns = helper.get_table_schema(
pg_hook, table, schema
).column_name.tolist()
masked_columns = helper.masking_pii(columns, pii_columns=PII_COLS)
masked_columns_str = ",\n".join(masked_columns)
if is_full_export:
statement = f'select {masked_columns_str} from {db}.{schema}."{table}"'
else:
statement = f'select {masked_columns_str} from {db}.{schema}."{table}" order by random() limit 10000'
s3_bucket_key = export_date + "_" + schema + "_" + table + ".csv"
sql_export = f"""
SELECT * from aws_s3.query_export_to_s3(
'{statement}',
aws_commons.create_s3_uri(
'{s3_bucket}',
'{s3_bucket_key}',
'ap-southeast-2'),
options := 'FORMAT csv, DELIMITER $$|$$'
)""".strip()
run_queries.append(sql_export)
def get_table_schema(pg_hook, table_name, table_schema):
""" Gets the schema details of a given table in a given schema."""
query = """
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = '{0}'
AND table_name = '{1}'
order by ordinal_position
""".format(table_schema, table_name)
df_schema = pg_hook.get_pandas_df(query)
return df_schema
def get_tables(pg_hook, schema):
query = """
select table_name from information_schema.tables
where table_schema = '{}' and table_type = 'BASE TABLE' and table_name != '_sdc_rejected' """.format(schema)
df_schema = pg_hook.get_pandas_df(query)
return df_schema
任务
task = PostgresOperator(
sql=run_queries,
postgres_conn_id=conn,
task_id="export_to_s3",
dag=dag,
autocommit=True,
)
tasks.append(task)
return tasks
气流 list_dags 输出
DAGS
-------------------------------------------------------------------
mydag
-------------------------------------------------------------------
DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 1
Total task number: 3
DagBag parsing time: 159.94030800000002
-----------------------------------------------------+--------------------+---------+----------
file | duration | dag_num | task_num
-----------------------------------------------------+--------------------+---------+----------
/mydag.py | 159.05215199999998 | 1 | 3
/ActivationPriorityCallList/CallList_Generator.py | 0.878734 | 0 | 0
/ActivationPriorityCallList/CallList_Preprocessor.py | 0.00744 | 0 | 0
/ActivationPriorityCallList/CallList_Emailer.py | 0.001154 | 0 | 0
/airflow_helperfunctions.py | 0.000828 | 0 | 0
-----------------------------------------------------+--------------------+---------+----------
观察
如果我只允许在任务中加载一个表,它工作得很好,但如果所有表都可以加载,则失败。如果从指向 RDS 的 docker 执行 Airflow,此行为是相同的