我尝试了您的代码并且工作正常,我没有收到任何提到的警告。我正在v2.1.2
使用官方 docker-compose 设置运行 Airflow。
我在 Airflow 的 repo ( pr , pr ) 中发现了一些与您收到的消息相关的旧版本问题,但现在应该已经解决了。尝试升级到最新版本的 Airflow,应该可以解决问题。
编辑:
以下是我将您的代码复制并粘贴到我正在运行的 AF 中后获得的内容:
图表视图:

日志:
airflow dags test test_tg 2021-08-24
输出:
[2021-08-24 15:59:19,247] {dagbag.py:496} INFO - Filling up the DagBag from /opt/airflow/dags
[2021-08-24 15:59:19,865] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.task_start 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:24,991] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=task_start
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
[2021-08-24 15:59:25,003] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=task_start, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155925
[2021-08-24 15:59:25,043] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:25,083] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 7 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 7
[2021-08-24 15:59:25,122] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.1_num_group.run_1 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:25,164] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.1_num_group.run_2 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:25,241] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.2_num_group.run_1 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:25,279] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.2_num_group.run_2 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:29,814] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=1_num_group.run_1
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
1
[2021-08-24 15:59:29,849] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=1_num_group.run_1, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155929
[2021-08-24 15:59:29,876] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:29,912] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=1_num_group.run_2
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
2
[2021-08-24 15:59:29,932] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=1_num_group.run_2, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155929
[2021-08-24 15:59:29,965] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:30,027] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=2_num_group.run_1
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
1
[2021-08-24 15:59:30,060] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=2_num_group.run_1, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155930
[2021-08-24 15:59:30,087] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:30,124] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=2_num_group.run_2
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
2
[2021-08-24 15:59:30,151] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=2_num_group.run_2, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155930
[2021-08-24 15:59:30,185] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:30,238] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 3 | succeeded: 5 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 3
[2021-08-24 15:59:30,275] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.1_num_group.agg 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:30,310] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.2_num_group.agg 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:34,826] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=1_num_group.agg
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
[1, 2]
[2021-08-24 15:59:34,833] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=1_num_group.agg, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155934
[2021-08-24 15:59:34,859] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:34,904] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=2_num_group.agg
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
[1, 2]
[2021-08-24 15:59:34,915] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=2_num_group.agg, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155934
[2021-08-24 15:59:34,945] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:34,982] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 7 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2021-08-24 15:59:35,014] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.task_end 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:39,829] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=task_end
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
[2021-08-24 15:59:39,841] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=task_end, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155939
[2021-08-24 15:59:39,867] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:39,890] {dagrun.py:444} INFO - Marking run <DagRun test_tg @ 2021-08-24 00:00:00+00:00: backfill__2021-08-24T00:00:00+00:00, externally triggered: False> successful
[2021-08-24 15:59:39,898] {backfill_job.py:388} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 8 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-08-24 15:59:39,905] {backfill_job.py:831} INFO - Backfill done. Exiting.
airflow info
输出:
default@91172692e679:/opt/airflow$ airflow info
Apache Airflow
version | 2.1.2
executor | CeleryExecutor
task_logging_handler | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn | postgresql+psycopg2://airflow:airflow@postgres/airflow
dags_folder | /opt/airflow/dags
plugins_folder | /opt/airflow/plugins
base_log_folder | /opt/airflow/logs
remote_base_log_folder |
...
...