我目前在 Airflow DAG 执行方面遇到一些问题。这是错误代码,我附上了气流 UI 行为:
DAG 定义:default_args = { 'owner': 'Analytics', 'depends_on_past': False, 'retries': 3, 'retry_delay': timedelta(minutes=1), 'start_date': suite_start_date } main_dag = DAG(dag_id=suite_name , default_args=default_args, schedule_interval=suite_schedule, dagrun_timeout=timedelta(minutes=60), max_active_runs=1)
我正在从用户输入创建动态 DAG:气流动态 DAG 和任务 ID
start_time 和 schedule_interval 在我的 Airflow 设置中是动态的。
19, 52, 22, 495538)), ('Process_Hourly.process_hourly_1_meta', 'install_device_attribution_ff', datetime.datetime(2017, 11, 14, 19, 52, 22, 495538))} [2017-11-14 20:26 :34,622] {base_task_runner.py:95} 信息 - 子任务:[2017-11-14 20:26:34,622] {base_task_runner.py:95} 信息 - 子任务:/usr/local/lib/python3.6/site- packages/airflow/www/app.py:23:FlaskWTFDeprecationWarning:“flask_wtf.CsrfProtect”已重命名为“CSRFProtect”并将在 1.0 中删除。[2017-11-14 20:26:34,622] {base_task_runner.py:95} 信息 - 子任务:csrf = CsrfProtect() [2017-11-14 20:26:34,622] {base_task_runner.py:95} 信息 - 子任务:/usr/local/lib/python3.6/site-packages/airflow/ti_deps/deps/base_ti_dep.py:100:DeprecationWarning:生成器“_get_dep_statuses”引发了 StopIteration [2017-11-14 20:26:34,622] {base_task_runner .py:95} 信息 - 子任务: