0

我有一个 DAG,它在函数内执行雪花 sql 语句。SQL 是一个插入语句,将记录从一个表插入到另一个表。插入时,源表中的一些记录有问题,没有插入到目标表中。这导致插入过程失败。

要求:

  1. 我想跳过错误记录并将下一条记录从源表加载到目标表。

  2. 我想将错误记录写入 GCS 存储桶中的文件。

     from airflow import DAG
     from airflow.operators import python_operator
     from datetime import datetime
     import logging
     from my_folder import param_file
     from snowflake.connector.errors import  ProgrammingError
    
     logging.basicConfig(level=logging.INFO)
     logging.basicConfig(level=logging.DEBUG)
     logger = logging.getLogger(__name__)
    
     args = {
         "owner": "test_dag",
         "start_date": datetime(2021, 3, 2, 10, 1),
         'depends_on_past': False,
         'email': ['mymailid@example.com'],
         'email_on_failure': True
     }
     cur = param_file.cur ## param_file is a python file that contains the connection details to Snowflake
     query = """insert into employees(first_name, last_name, workphone, city,postal_code)
       select contractor_first,contractor_last,worknum,null,zip_code from contractors"""
    
     def run_sql(**context):
         try:
             cur.execute(query)
         except ProgrammingError as db_ex:
             print(f"Programming error: {db_ex}")
    
     with DAG(
         dag_id="EXCEPTION_testing", schedule_interval=None, max_active_runs=1,catchup=False,default_args=args) as dag:
    
         task1= python_operator.PythonOperator(
         task_id='snowflake_query_execution',
         python_callable=run_sql,
         dag=dag)        
    

我曾尝试使用 try 和 except,但任务是为错误记录引发异常,并且不处理源表中的下一个记录。

任何帮助表示赞赏。

4

0 回答 0