我设置了两个 DAG,我们称第一个为 orchestrator,第二个为 worker。Orchestrator 的工作是从 API 中检索一个列表,并针对该列表中的每个元素,使用一些参数触发工作 DAG。
我将这两个工作流程分开的原因是我希望能够仅重播失败的“工作”工作流程(如果一个失败,我不想重播所有工作实例)。
我能够让事情正常进行,但现在我看到了监控是多么困难,因为我的 task_id 对所有人都是一样的,所以我决定根据“orchestrator”工作流从 API 检索到的值来使用动态 task_id。
但是,我无法从运算符外部的 dag_run 对象中检索值。基本上,我希望这个工作:
with models.DAG('specific_workflow', schedule_interval=None, default_args=default_dag_args) as dag:
name = context['dag_run'].name
hello_world = BashOperator(task_id='hello_{}'.format(name), bash_command="echo Hello {{ dag_run.conf.name }}", dag=dag)
bye = BashOperator(task_id='bye_{}'.format(name), bash_command="echo Goodbye {{ dag_run.conf.name }}", dag=dag)
hello_world >> bye
但我无法定义这个“上下文”对象。但是,我可以从运算符(例如 PythonOperator 和 BashOperator)访问它。
是否可以在操作员之外检索 dag_run 对象?