1

我有一个定义 DAG 对象的文件:

dags/my_dag.py

from airflow import DAG
from datetime import datetime

default_args = {
    'owner': 'pilota',
    'depends_on_past': False,
    'start_date': datetime(2019, 10, 1),
    'email': ['some@email.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
}

bts_dag = DAG(
    'hist_data_etl', default_args=default_args, schedule_interval='@once')

然后在另一个文件中,我导入创建的 dag 并定义我的任务:

from ingestion.airflow_home.dags.my_dag import bts_dag
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from ingestion.datatransformer import fetch_and_transform_bts_data_col

NUM_ENGINES = 4

template_command = '''
    ipcluster start n {{ params.cluster }}
    sleep 5
'''

start_iparallel_cluster = BashOperator(
    task_id='start_cluster',
    bash_command=template_command,
    retries=3,
    params={'params': NUM_ENGINES},
    dag=bts_dag)


import_hist_bts_data_task = PythonOperator(
    task_id='fetch_transform_hist_col',
    python_callable=fetch_and_transform_bts_data_col,
    op_kwargs={
        'bucket': 'some-bucket', 'path': 'hello/', 'num_files': 1
    },
    dag=bts_dag)

start_iparallel_cluster >> import_hist_bts_data_task

完整性检查:

$ airflow list_dags

产量:

 -------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
hist_data_etl

然而

$ airflow list_tasks hist_data_etl

不输出我的任何任务。不知何故,气流没有将任务注册为属于我在另一个文件中定义的 DAG。

请帮忙 :)

4

1 回答 1

2
  • 由于dag 文件解析在 Airflow 中的工作方式,我不希望这能工作
  • 即使我不完全了解内部结构,但 Airflow 会生成子进程来解析 dag 定义文件(由某些特征标识的文件)。每个进程解析不同的文件子集 => 不同的文件很可能由不同的进程处理
  • 我相信在您的实现中,解析文件的逻辑顺序(首先解析 dag 文件,然后是任务文件)没有保留,因此事情不起作用

但是,通过对您的方法进行一些修改,您可以使其正常工作

第一个文件

# dag_object_creator.py

from airflow import DAG
from datetime import datetime

default_args = {
    'owner': 'pilota',
    'depends_on_past': False,
    'start_date': datetime(2019, 10, 1),
    'email': ['some@email.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
}

def create_dag_object():
    bts_dag = DAG(dag_id='hist_data_etl',
                  default_args=default_args,
                  schedule_interval='@once')
    return bts_dag

第二个文件

# tasks_creator.py

# this import statement is problematic
# from ingestion.airflow_home.dags.my_dag import bts_dag
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from ingestion.datatransformer import fetch_and_transform_bts_data_col

NUM_ENGINES = 4

template_command = '''
    ipcluster start n {{ params.cluster }}
    sleep 5
'''

def create_bash_task(bts_dag):
    start_iparallel_cluster = BashOperator(
        task_id='start_cluster',
        bash_command=template_command,
        retries=3,
        params={'params': NUM_ENGINES},
        dag=bts_dag)
    return start_iparallel_cluster


def create_python_task(bts_dag):
    import_hist_bts_data_task = PythonOperator(
        task_id='fetch_transform_hist_col',
        python_callable=fetch_and_transform_bts_data_col,
        op_kwargs={
            'bucket': 'pilota-ml-raw-store', 'path': 'flights/', 'num_files': 1
        },
        dag=bts_dag)
    return import_hist_bts_data_task

第三个文件

# dag_definition_file.py

import dag_object_creator
import tasks_creator

# create dag object
# stuff from 'dag_object_creator.py' can be put here directly,
# i just broke down things for clarity
bts_dag = dag_object_creator.create_dag_object()

# create tasks
start_iparallel_cluster = tasks_creator.create_bash_task(bts_dag)
import_hist_bts_data_task = tasks_creator.create_python_task(bts_dag)

# chaining tasks
start_iparallel_cluster >> import_hist_bts_data_task

上述代码布局将强制执行以下行为

  • 前期进程仅开始解析dag_definition_file.py(跳过其他两个文件,因为在全局范围内没有创建“DAG”)

  • import语句被执行时,这些文件被解析

  • 当执行 dag / task 创建语句时,DAG & task 对象分别在全局范围内创建

因此一切都很好地到位,这个实现应该可以工作(未经测试,但基于轶事知识)


推荐阅读

于 2019-10-02T05:40:05.837 回答