0

我正在尝试触发 dag,并从我的个人 UI 中包含一个编辑功能。这个想法是,用户将能够编辑任何现有的时间表,无论是具有相同的 dag_id 还是不同的 dag_id。因此,我已经包含了所有参数,例如时间戳,以使我的 dag_id 唯一,这样如果用户尝试编辑现有的 dag(他可能会更改/保留现有的 dag_id),dag_id 将自动更改并运行.

            if mode == "Days":
                cron = "{} {} */{} * *".format(time[1],time[0],step)
            elif mode == "Months":
                cron = "{} {} 1 1-12/{} *".format(time[1],time[0],step)

在上面的代码中,我将 'mode' 作为输入,它是一个字符串和可能的值 - 'Days' / 'Moths'。基于此,将决定 cron。

'time' 参数的格式为 hh:mm,即 dag 将在特定日期开始的时间,'step' 每隔 1 或 2 或 3 或....30 天重复一次。因此,如果用户从 UI 中选择“天”,我将显示在 1-31 范围内选择的选项,如果用户选择“月”,我将显示选项 1-12。

这是我的 DAG,每个编辑都有唯一的 dag_id-

                     dag_id = latest_schedule_request["dag_id"] + \
                     timezone_datetime_start.strftime('_%Y_%m_%d_%H_%M_%S') + \
                     timezone_datetime_end.strftime('_%Y_%m_%d_%H_%M_%S') + \
                     "_".join(tz.split('/')) + '_' + mode + '_' + str(step)

dag = DAG(dag_id, default_args = default_arguments, start_date = timezone_datetime_start - timedelta(days =step),
                    end_date =  timezone_datetime_end,
                    catchup = False,
                    schedule_interval = cron
                    )
task1 = PythonOperator(task_id= 'Some_task', python_callable = train_model, dag = dag)
task2 = PythonOperator(task_id= 'Delete_Past_Dags' , python_callable = delete_past_dags, dag = dag)

task2>>task1

task2 is - 删除所有过去的 dag,除了正在运行的一个 task1 is - some task

问题是,当 step = 1 时,使用此代码,这将在我打算开始的日期和准确的时间 hh:mm 上执行,并且还可以正确安排未来的运行。

但是当 step = 2 或 3 等时,dag 会立即被触发,而不是我通过前端 hh:mm 提供的时间。为什么这样?我相信我的任务是 timedelta(days = step),在这种情况下它不应该发生。

例如,对于 step =2,如果今天是 2021-12-06,则第一个 dag_run 立即被触发,即 2021-12-04 的运行,下一次运行显示 2021-12-05 而不是 2021-12-06 . 但是对于 step = 1 时的相同代码,这可以正常工作。有什么建议吗?

4

0 回答 0