5

我设置了两个 DAG,我们称第一个为 orchestrator,第二个为 worker。Orchestrator 的工作是从 API 中检索一个列表,并针对该列表中的每个元素,使用一些参数触发工作 DAG。

我将这两个工作流程分开的原因是我希望能够仅重播失败的“工作”工作流程(如果一个失败,我不想重播所有工作实例)。

我能够让事情正常进行,但现在我看到了监控是多么困难,因为我的 task_id 对所有人都是一样的,所以我决定根据“orchestrator”工作流从 API 检索到的值来使用动态 task_id。

但是,我无法从运算符外部的 dag_run 对象中检索值。基本上,我希望这个工作:

with models.DAG('specific_workflow', schedule_interval=None, default_args=default_dag_args) as dag:
    name = context['dag_run'].name
    hello_world = BashOperator(task_id='hello_{}'.format(name), bash_command="echo Hello {{ dag_run.conf.name }}", dag=dag)
    bye = BashOperator(task_id='bye_{}'.format(name), bash_command="echo Goodbye {{ dag_run.conf.name }}", dag=dag)

    hello_world >> bye

但我无法定义这个“上下文”对象。但是,我可以从运算符(例如 PythonOperator 和 BashOperator)访问它。

是否可以在操作员之外检索 dag_run 对象?

4

2 回答 2

1

我认为目前这不容易。例如,作为工作程序运行过程的一部分,除了在哪里可以找到 DAG 之外,在没有提供任何 TaskInstance 上下文的情况下检索 DAG:https ://github.com/apache/incubator-airflow/blob/f18e2550543e455c9701af0995bc393ee6a97b47/airflow/bin/cli .py#L353

稍后注入上下文:https ://github.com/apache/incubator-airflow/blob/c5f1c6a31b20bbb80b4020b753e88cc283aaf197/airflow/models.py#L1479

DAG将run_id是存储此信息的好地方。

于 2018-08-28T15:58:30.190 回答
1

是的,有可能我为我尝试和工作的是

在下面的代码块中,我试图展示所有可能的方式来使用传递的配置,直接传递给不同的操作员

pyspark_task = DataprocSubmitJobOperator(
    task_id="task_0001",
    job=PYSPARK_JOB,
    location=f"{{{{dag_run.conf.get('dataproc_region','{config_data['cluster_location']}')}}}}",
    project_id="{{dag_run.conf['dataproc_project_id']}}",
    gcp_conn_id="{{dag_run.conf.gcp_conn_id}}"
)

所以你可以像这样使用它

"{{dag_run.conf.field_name}}" or "{{dag_run.conf['field_name']}}"

或者如果您想使用一些默认值以防配置字段是可选的,

f"{{{{dag_run.conf.get('field_name', '{local_variable['field_name_0002']}')}}}}"
于 2021-10-05T12:27:29.807 回答