0

我使用气流 2 任务流 API 创建了一个 DAG:

with airflow.DAG("plot", schedule_interval=None, default_args=default_args) as dag:
    cf = collect_files()
    upi = update_process_info(cf)
    for i in range(0, max_parallel_plot_tasks):
        plot_files(cf, i, int(max_parallel_plot_tasks)) >> upi

如何使用 Taskflow API 摆脱“collect_files”与“update_process_info”的连接?

图形:

图形

问候奥利

4

1 回答 1

1

尝试类似:

with airflow.DAG("plot", schedule_interval=None, default_args=default_args) as dag:
    cf = collect_files()
    upi = None
    for i in range(0, max_parallel_plot_tasks):
        if not upi:
            upi = update_process_info(plot_files(cf, i, int(max_parallel_plot_tasks)))
        else:
            plot_files(cf, i, int(max_parallel_plot_tasks)) >> upi
于 2021-02-26T08:14:09.213 回答