气流图任务分支从未运行,抱怨“数据库中不存在任务实例”,但可以在图中看到。
我有一个气流图,其条件分支定义如下
class BranchFlags(Enum):
yes = "yes"
no = "no"
...
for table in list_of_tables # type list(dict)
task_1 = BashOperator(
task_id='task_1_%s' % table["conf1"],
bash_command='bash script1.sh %s' % table["conf1"],
dag=dag)
if table["branch_flag"] == BranchFlags.yes:
consolidate = BashOperator(
task_id='task_3_%s' % table["conf2"],
bash_command='python %s/consolidate_parquet.py %s' % table["conf2"],
dag=dag)
task_3 = BashOperator(
task_id='task_3_%s' % table["conf3"],
bash_command='bash script3.sh %s' % table["conf3"],
dag=dag)
task_1 >> task_3
if table["branch_flag"] == BranchFlags.yes:
task_1 >> task_2
请注意,即使图的较长部分运行良好,但没有为应该分支的一个序列运行单独的分支。查看任务的日志时,我看到
*** 数据库中不存在任务实例
这对我来说很奇怪,因为从表面上看,调度程序 DB 看到了该任务,因为它确实出现在 Web UI 图中。不确定这里发生了什么并向 dag.py文件添加其他更改确实会显示在图表中,并在运行图表时由调度程序执行。并尝试查看任务任务实例详细信息会引发错误
任务 [dagname.task_3_qwerty] 目前似乎不存在
跑步airflow resetdb(正如我在其他帖子中看到的那样)对问题没有任何帮助。
请注意,其目的是短分支与较长分支同时运行(不是作为非此即彼的选择)。
任何人都知道为什么会发生这种情况或有一些调试技巧吗?
