我有一个定义 DAG 对象的文件:
dags/my_dag.py
from airflow import DAG
from datetime import datetime
default_args = {
'owner': 'pilota',
'depends_on_past': False,
'start_date': datetime(2019, 10, 1),
'email': ['some@email.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
}
bts_dag = DAG(
'hist_data_etl', default_args=default_args, schedule_interval='@once')
然后在另一个文件中,我导入创建的 dag 并定义我的任务:
from ingestion.airflow_home.dags.my_dag import bts_dag
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from ingestion.datatransformer import fetch_and_transform_bts_data_col
NUM_ENGINES = 4
template_command = '''
ipcluster start n {{ params.cluster }}
sleep 5
'''
start_iparallel_cluster = BashOperator(
task_id='start_cluster',
bash_command=template_command,
retries=3,
params={'params': NUM_ENGINES},
dag=bts_dag)
import_hist_bts_data_task = PythonOperator(
task_id='fetch_transform_hist_col',
python_callable=fetch_and_transform_bts_data_col,
op_kwargs={
'bucket': 'some-bucket', 'path': 'hello/', 'num_files': 1
},
dag=bts_dag)
start_iparallel_cluster >> import_hist_bts_data_task
完整性检查:
$ airflow list_dags
产量:
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
hist_data_etl
然而
$ airflow list_tasks hist_data_etl
不输出我的任何任务。不知何故,气流没有将任务注册为属于我在另一个文件中定义的 DAG。
请帮忙 :)