12

我是气流新手。
我遇到了一个场景,其中父 DAG 需要将一些动态数字(比如说n)传递给子 DAG。
SubDAG 将使用此数字来动态创建n并行任务。

气流文档没有涵盖实现这一目标的方法。所以我探索了几种方法:

选项 - 1(使用 xcom 拉取)

我试图作为 xcom 值传递,但由于某种原因,SubDAG 没有解析为传递的值。

父 Dag 文件

def load_dag(**kwargs):
    number_of_runs = json.dumps(kwargs['dag_run'].conf['number_of_runs'])
    dag_data = json.dumps({
        "number_of_runs": number_of_runs
    })
    return dag_data

# ------------------ Tasks ------------------------------
load_config = PythonOperator(
    task_id='load_config',
    provide_context=True,
    python_callable=load_dag,
    dag=dag)


t1 = SubDagOperator(
    task_id=CHILD_DAG_NAME,
    subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config') }}'" ),
    default_args=default_args,
    dag=dag,
)

子日期文件

def sub_dag(parent_dag_name, child_dag_name, args, num_of_runs):
    dag_subdag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=args,
        schedule_interval=None)

    variabe_names = {}

    for i in range(num_of_runs):
        variabe_names['task' + str(i + 1)] =  DummyOperator(
        task_id='dummy_task',
        dag=dag_subdag,
    )

    return dag_subdag

选项 - 2

我也尝试将number_of_runs其作为全局变量传递,但它不起作用。

选项 - 3

我们还尝试将此值写入数据文件。但子 DAG 正在抛出File doesn't exist error。这可能是因为我们正在动态生成这个文件。

有人可以帮我弄这个吗。

4

4 回答 4

2

我已经用选项 3 完成了。如果文件不存在,关键是返回一个没有任务的有效 dag。因此 load_config 将生成一个包含您的任务数量或更多信息的文件(如果需要)。您的 subdag 工厂看起来像:

def subdag(...):
    sdag = DAG('%s.%s' % (parent, child), default_args=args, schedule_interval=timedelta(hours=1))
    file_path = "/path/to/generated/file"
    if os.path.exists(file_path):
        data_file = open(file_path)
        list_tasks = data_file.readlines()
        for task in list_tasks:
            DummyOperator(
                  task_id='task_'+task,
                  default_args=args,
                  dag=sdag,
            )
    return sdag

在 dag 生成时,您将看到一个没有任务的 subdag。在 dag 执行时,在 load_config 完成后,你可以看到你动态生成的 subdag

于 2017-09-21T05:56:31.163 回答
0

如果您要写入的文件名不是动态的(例如,您为每个任务实例一遍又一遍地写入同一个文件),Jaime 的答案将起作用:

file_path = "/path/to/generated/file"

但是,如果您需要一个唯一的文件名,或者希望每个任务实例为并行执行的任务写入不同的内容,气流将不适用于这种情况,因为无法在模板之外传递执行日期或变量。看看这个帖子

于 2018-08-22T06:51:57.757 回答
0

看看我的答案 here,其中我描述了一种基于先前使用 xcoms 和 subdags 执行的任务的结果动态创建任务的方法。

于 2018-08-23T03:16:55.600 回答
0

如果您只是将调用更改xcom_pull为包含dag_id父 dag 的,选项 1 应该可以工作。默认情况下,xcom_pull调用将task_id 'load_config'在它自己的不存在的 dag 中查找。

因此将 x_com 调用宏更改为:

subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config', dag_id='" + PARENT_DAG_NAME + "' }}'" ),
于 2018-08-02T11:06:50.203 回答