1

可以在此处找到与此问题相关的原始代码。

我对移位运算符和set_upstream/set_downstream方法都在我在 DAG 中定义的任务循环中工作感到困惑。当DAG的主执行循环配置如下:

for uid in dash_workers.get_id_creds():
    clear_tables.set_downstream(id_worker(uid))

或者

for uid in dash_workers.get_id_creds():
    clear_tables >> id_worker(uid)

该图如下所示(字母数字序列是用户 ID,也定义了任务 ID):

在此处输入图像描述

当我像这样配置 DAG 的主执行循环时:

for uid in dash_workers.get_id_creds():
    clear_tables.set_upstream(id_worker(uid))

或者

for uid in dash_workers.get_id_creds():
    id_worker(uid) >> clear_tables

图表如下所示:

在此处输入图像描述

第二张图是我想要的/我期望根据我对文档的阅读产生的前两个代码片段。如果我想clear_tables在触发针对不同用户 ID 的一批数据解析任务之前先执行,我应该将其表示为clear_tables >> id_worker(uid)

编辑——这是完整的代码,自我发布最后几个问题以来已经更新,供参考:

from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

ENV = os.environ

default_args = {
  'start_date': datetime.now(),
}

DAG = DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

clear_tables = PythonOperator(
  task_id='clear_tables',
  python_callable=dash_workers.clear_db,
  dag=DAG)

def id_worker(uid):
    return PythonOperator(
        task_id=id,
        python_callable=dash_workers.main_preprocess,
        op_args=[uid],
        dag=DAG)

for uid in dash_workers.get_id_creds():
    preproc_task = id_worker(uid)
    clear_tables << preproc_task

在实施@LadislavIndra 的建议后,我继续对位移运算符进行相同的反向实施,以获得正确的依赖关系图。

更新@AshBerlin-Taylor 的回答是这里发生了什么。我假设 Graph View 和 Tree View 做同样的事情,但事实并非如此。这是id_worker(uid) >> clear_tables图表视图中的样子:

在此处输入图像描述

我当然不希望我的数据准备程序的最后一步是删除所有数据表!

4

2 回答 2

5

Airflow 中的树形视图与您(和我!)最初的想法相比是“倒退”的。在您的第一个屏幕截图中,它显示必须在“AAAG5608078M2”运行任务之前运行“clear_tables”。DAG 状态取决于每个 id worker 任务。因此,它不是任务顺序,而是状态链的树。如果这有任何意义的话。

(起初这可能看起来很奇怪,但这是因为 DAG 可以分支并重新分支。)

查看 Graph 视图的 dag 可能会更好。这个有箭头并以更直观的方式显示执行顺序。(虽然我现在确实发现树视图很有用。开始时不太清楚)

于 2017-07-28T16:04:33.677 回答
1

查看您的其他代码,这似乎get_id_creds是您的任务,而您正试图循环通过它,这会产生一些奇怪的交互。

一个可行的模式是:

clear_tables = MyOperator()

for uid in uid_list:
  my_task = MyOperator(task_id=uid)
  clear_tables >> my_task
于 2017-07-28T14:06:10.683 回答