0
import logging
import pandas as pd
import boto3

from datetime import datetime
from airflow import DAG, settings
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook
from airflow.models import Variable, Connection
from airflow.utils.trigger_rule import TriggerRule
from settings import settings_manager
from config.misc import misc_config

logger = logging.getLogger(__name__)
list_tables_query = f"""SHOW TABLES IN SCHEMA {settings_manager.SNOWFLAKE_DATABASE}.STAGING"""
overwrite_variable_name = 'overwrite_staging_tables'

dag = DAG("v4_functional_table_creation", schedule_interval=None,
       start_date=datetime(2019, 2, 20))


start = DummyOperator(task_id='start',
                   dag=dag)


def print_str(file):
 logger.info(file)


def list_existing_tables():
 # left out for brevity
 return table_list


def does_table_exists(file, table_name, list_of_tables):
 if table_name.upper() in list_of_tables:
     return f"""overwrite_check_for_{table_name}"""
 else:
     return f"""get_latest_{file}_file"""


def overwrite_check(source_name, table_name):
 overwrite = 'False'
 if overwrite == 'True':
     return f"""drop_{source_name}.{table_name}"""
 else:
     return "end"


def create_table(table_name, file_path):
 logger.info(f"""creating table {table_name} using {file_path} file.""")


def get_latest_uploaded_file(file, source):
 """
 Given a s3 prefix, returns path of the latest uploaded file.
 """
 # left out for brevity
 return latest.get('Key', '')


list_existing_tables = PythonOperator(task_id="list_existing_tables",
                                   python_callable=list_existing_tables,
                                   dag=dag)

end = DummyOperator(task_id='end',
                 dag=dag)

start >> list_existing_tables

for source in misc_config.get('s3_sources_to_parse'):
 file_list_str = Variable.get(f"""file_list_{source}""")
 file_list_str = file_list_str[2:-2]
 file_list = file_list_str.split(',')

 for file_str in file_list:
     file = file_str.strip(" ").strip('"').strip("'")
     table_name = f"""{source}_{file}"""

     check_table_exists = BranchPythonOperator(task_id=f"""check_{table_name}_exists""",
                                               python_callable=does_table_exists,
                                               op_kwargs={'table_name': table_name,
                                                          'list_of_tables': list_existing_tables.output,
                                                          'file': file},
                                               dag=dag)

     check_overwrite_condition = BranchPythonOperator(task_id=f"""overwrite_check_for_{table_name}""",
                                                      python_callable=overwrite_check,
                                                      op_kwargs={'source_name': source,
                                                                 'table_name': table_name},
                                                      dag=dag)

     get_latest_file = PythonOperator(task_id=f"""get_latest_{file}_file""",
                                      python_callable=get_latest_uploaded_file,
                                      op_kwargs={'file': file,
                                                 'source': source},
                                      trigger_rule='none_failed_or_skipped',
                                      dag=dag)

     drop_table = PythonOperator(task_id=f"""drop_{table_name}""",
                                 python_callable=print_str,
                                 op_kwargs={'file': f"""dropping_{table_name}"""},
                                 trigger_rule='none_failed_or_skipped',
                                 dag=dag)

     create_table_task = PythonOperator(task_id=f"""create_{table_name}""",
                                        python_callable=create_table,
                                        op_kwargs={'table_name': table_name,
                                                   'file_path': get_latest_file.output},
                                        dag=dag)

     list_existing_tables >> check_table_exists >> [check_overwrite_condition, get_latest_file]
     check_overwrite_condition >> [drop_table, end]
     drop_table >> get_latest_file >> create_table_task >> end

对于在文件底部声明的 DAG,参考随附的屏幕截图,我希望get_latest_waffle_switch_filecreate_sos_waffle_switch被跳过。我不确定如何/为什么get_latest_waffle_switch_file触发任务。

我认为这应该是宣布 DAG/任务关系正确的问题。

任何指针都会非常有帮助!提前致谢_/\_ [1]:https ://i.stack.imgur.com/eUUbh.png

4

0 回答 0