我有一个 DAG,它在函数内执行雪花 sql 语句。SQL 是一个插入语句,将记录从一个表插入到另一个表。插入时,源表中的一些记录有问题,没有插入到目标表中。这导致插入过程失败。
要求:
我想跳过错误记录并将下一条记录从源表加载到目标表。
我想将错误记录写入 GCS 存储桶中的文件。
from airflow import DAG from airflow.operators import python_operator from datetime import datetime import logging from my_folder import param_file from snowflake.connector.errors import ProgrammingError logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) args = { "owner": "test_dag", "start_date": datetime(2021, 3, 2, 10, 1), 'depends_on_past': False, 'email': ['mymailid@example.com'], 'email_on_failure': True } cur = param_file.cur ## param_file is a python file that contains the connection details to Snowflake query = """insert into employees(first_name, last_name, workphone, city,postal_code) select contractor_first,contractor_last,worknum,null,zip_code from contractors""" def run_sql(**context): try: cur.execute(query) except ProgrammingError as db_ex: print(f"Programming error: {db_ex}") with DAG( dag_id="EXCEPTION_testing", schedule_interval=None, max_active_runs=1,catchup=False,default_args=args) as dag: task1= python_operator.PythonOperator( task_id='snowflake_query_execution', python_callable=run_sql, dag=dag)
我曾尝试使用 try 和 except,但任务是为错误记录引发异常,并且不处理源表中的下一个记录。
任何帮助表示赞赏。