0

气流图任务分支从未运行,抱怨“数据库中不存在任务实例”,但可以在图中看到。

我有一个气流图,其条件分支定义如下

class BranchFlags(Enum):
    yes = "yes"
    no = "no"
...
for table in list_of_tables # type list(dict)
    task_1 = BashOperator(
        task_id='task_1_%s' % table["conf1"],
        bash_command='bash script1.sh %s' % table["conf1"],
        dag=dag)

    if table["branch_flag"] == BranchFlags.yes:
        consolidate = BashOperator(
            task_id='task_3_%s' % table["conf2"],
            bash_command='python %s/consolidate_parquet.py %s' % table["conf2"],
            dag=dag)

    task_3 = BashOperator(
        task_id='task_3_%s' % table["conf3"],
        bash_command='bash script3.sh %s' % table["conf3"],
        dag=dag)

    task_1 >> task_3
    if table["branch_flag"] == BranchFlags.yes:
        task_1 >> task_2

这是我实际代码中气流 UI 中的图表: 在此处输入图像描述

请注意,即使图的较长部分运行良好,但没有为应该分支的一个序列运行单独的分支。查看任务的日志时,我看到

*** 数据库中不存在任务实例

这对我来说很奇怪,因为从表面上看,调度程序 DB 看到了该任务,因为它确实出现在 Web UI 图中。不确定这里发生了什么并向 dag.py文件添加其他更改确实会显示在图表中,并在运行图表时由调度程序执行。并尝试查看任务任务实例详细信息会引发错误

任务 [dagname.task_3_qwerty] 目前似乎不存在

跑步airflow resetdb(正如我在其他帖子中看到的那样)对问题没有任何帮助。

请注意,其目的是短分支与较长分支同时运行(不是作为非此即彼的选择)。

任何人都知道为什么会发生这种情况或有一些调试技巧吗?

4

0 回答 0