4

我正在尝试使用 TaskGroup 创建动态任务,并将结果保存在变量中。该变量根据数据库查询每 N 分钟修改一次,但是当第二次修改该变量时,调度程序发生故障

基本上我需要根据查询中收到的唯一行数来创建任务。

以 TaskGroup(f"task") 作为任务:

    data_variable = Variable.get("df")
    data = data_variable

    try :
        if data != False and data !='none':
            df = pd.read_json(data)

            for field_id in df.field.unique():
             

                task1 = PythonOperator(
                   
                )
                task2 = PythonOperator(
                   
                )

               
                task1 >> task2

    except:
        pass

有没有办法用任务组来做到这一点?

4

1 回答 1

0

这是 Airflow 的反模式。

虽然您可以使用Variable.get("df")顶级代码,但您不应该这样做。使用任何数据库创建查询的变量/连接/任何其他代码只能在操作员范围内或使用 Jinja 模板完成。原因是 Airflow 会定期解析 DAG 文件(如果您没有更改默认值,则每 30 秒解析一次min_file_process_interval),因此每 30 秒与数据库交互的代码将导致该数据库的负载过重。对于其中一些情况,未来的气流版本会发出警告(请参阅PR

气流任务应尽可能保持静态(或缓慢变化)。

于 2021-03-16T08:48:51.340 回答