我的一个 DAG 设置为每 5 分钟运行一次。任务是简单地同步一些文件,没有什么太复杂的。但从昨天开始,一些运行似乎一直处于“运行”状态。这都是随机的,不知道为什么一些运行成功,而其他运行则卡在运行中。
气流版本:2.0.1
如果成功,该作业不应超过几秒钟。
该 DAG 的代码
import datetime
from airflow import DAG
from airflow.providers.ssh.operators.ssh import SSHOperator
args = {
'owner': 'xxx',
'start_date': datetime.datetime(2020, 9, 1)
}
dag = DAG('SYNC-JOB',
default_args=args,
schedule_interval='*/5 * * * 1-5',
catchup=False)
ssh_touch = SSHOperator(
task_id='sync_data',
ssh_conn_id='ssh-bo-localhost',
command="/folder/bin/syncjob.sh ",
dag=dag
)
对于不成功的,worker 中没有创建日志文件。对于成功的人来说,worker log
看起来像这样
airflow@node04:/opt/airflow/logs/SYNC-JOB/sync_data$ cat 2021-12-23T00:30:00+00:00/1.log
[2021-12-23 09:35:01,887] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: SYNC-JOB.sync_data 2021-12-23T00:30:00+00:00 [queued]>
[2021-12-23 09:35:01,907] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: SYNC-JOB.sync_data 2021-12-23T00:30:00+00:00 [queued]>
[2021-12-23 09:35:01,907] {taskinstance.py:1042} INFO -
--------------------------------------------------------------------------------
[2021-12-23 09:35:01,907] {taskinstance.py:1043} INFO - Starting attempt 1 of 1
[2021-12-23 09:35:01,907] {taskinstance.py:1044} INFO -
--------------------------------------------------------------------------------
[2021-12-23 09:35:01,938] {taskinstance.py:1063} INFO - Executing <Task(SSHOperator): sync_data> on 2021-12-23T00:30:00+00:00
[2021-12-23 09:35:01,944] {standard_task_runner.py:52} INFO - Started process 73 to run task
[2021-12-23 09:35:01,958] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'SYNC-JOB', 'sync_data', '2021-12-23T00:30:00+00:00', '--job-id', '2890710', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/sync-jobs.zip', '--cfg-path', '/tmp/tmpnhhip9dl', '--error-file', '/tmp/tmplvcy0bm5']
[2021-12-23 09:35:01,958] {standard_task_runner.py:77} INFO - Job 2890710: Subtask sync_data
[2021-12-23 09:35:02,033] {logging_mixin.py:104} INFO - Running <TaskInstance: SYNC-JOB.sync_data 2021-12-23T00:30:00+00:00 [running]> on host node04
[2021-12-23 09:35:02,150] {taskinstance.py:1257} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=abhishek
AIRFLOW_CTX_DAG_ID=SYNC-JOB
AIRFLOW_CTX_TASK_ID=sync_data
AIRFLOW_CTX_EXECUTION_DATE=2021-12-23T00:30:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-12-23T00:30:00+00:00
[2021-12-23 09:35:02,151] {ssh.py:90} INFO - ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.
[2021-12-23 09:35:02,175] {base.py:74} INFO - Using connection to: id: ssh-bo-localhost. Host: localhost, Port: None, Schema: , Login: xxxxx, Password: None, extra: XXXXXXXX
[2021-12-23 09:35:02,177] {ssh.py:207} WARNING - No Host Key Verification. This wont protect against Man-In-The-Middle attacks
[2021-12-23 09:35:02,196] {transport.py:1819} INFO - Connected (version 2.0, client OpenSSH_7.4)
[2021-12-23 09:35:02,295] {transport.py:1819} INFO - Authentication (publickey) successful!
[2021-12-23 09:35:02,296] {ssh.py:109} INFO - Running command: syncjob.sh
[2021-12-23 09:35:03,096] {taskinstance.py:1166} INFO - Marking task as SUCCESS. dag_id=SYNC-JOB, task_id=sync_data, execution_date=20211223T003000, start_date=20211223T003501, end_date=20211223T003503
[2021-12-23 09:35:03,550] {taskinstance.py:1220} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-12-23 09:35:03,572] {local_task_job.py:146} INFO - Task exited with return code 0
这个成功的运行由不成功的运行继续并成功,这些运行仍在“运行”。
Scheduler log
看起来像这样
95:[2021-12-23 09:27:42,526] {scheduler_job.py:639} INFO - DAG(s) dict_keys(['SYNC-JOB']) retrieved from /opt/airflow/dags/sync-job/sync_data.py
96:[2021-12-23 09:27:42,543] {logging_mixin.py:104} INFO - [2021-12-23 09:27:42,543] {dag.py:1818} INFO - Sync 1 DAGs
97:[2021-12-23 09:27:42,603] {logging_mixin.py:104} INFO - [2021-12-23 09:27:42,603] {dag.py:2273} INFO - Setting next_dagrun for SYNC-JOB to 2021-12-23T00:25:00+00:00
98:[2021-12-23 09:27:42,618] {scheduler_job.py:190} INFO - Processing /opt/airflow/dags/sync-job/sync_data.py took 0.142 seconds
99:[2021-12-23 09:29:43,015] {scheduler_job.py:182} INFO - Started process (PID=2852) to work on /opt/airflow/dags/sync-job/sync_data.py
100:[2021-12-23 09:29:43,016] {scheduler_job.py:629} INFO - Processing file /opt/airflow/dags/sync-job/sync_data.py for tasks to queue
101:[2021-12-23 09:29:43,017] {logging_mixin.py:104} INFO - [2021-12-23 09:29:43,017] {dagbag.py:448} INFO - Filling up the DagBag from /opt/airflow/dags/sync-job/sync_data.py
102:[2021-12-23 09:29:43,062] {scheduler_job.py:639} INFO - DAG(s) dict_keys(['SYNC-JOB']) retrieved from /opt/airflow/dags/sync-job/sync_data.py
103:[2021-12-23 09:29:43,081] {logging_mixin.py:104} INFO - [2021-12-23 09:29:43,081] {dag.py:1818} INFO - Sync 1 DAGs
104:[2021-12-23 09:29:43,142] {logging_mixin.py:104} INFO - [2021-12-23 09:29:43,141] {dag.py:2273} INFO - Setting next_dagrun for SYNC-JOB to 2021-12-23T00:25:00+00:00
105:[2021-12-23 09:29:43,153] {scheduler_job.py:190} INFO - Processing /opt/airflow/dags/sync-job/sync_data.py took 0.142 seconds
106:[2021-12-23 09:31:43,292] {scheduler_job.py:182} INFO - Started process (PID=2922) to work on /opt/airflow/dags/sync-job/sync_data.py
107:[2021-12-23 09:31:43,293] {scheduler_job.py:629} INFO - Processing file /opt/airflow/dags/sync-job/sync_data.py for tasks to queue
108:[2021-12-23 09:31:43,294] {logging_mixin.py:104} INFO - [2021-12-23 09:31:43,294] {dagbag.py:448} INFO - Filling up the DagBag from /opt/airflow/dags/sync-job/sync_data.py
109:[2021-12-23 09:31:43,340] {scheduler_job.py:639} INFO - DAG(s) dict_keys(['SYNC-JOB']) retrieved from /opt/airflow/dags/sync-job/sync_data.py
110:[2021-12-23 09:31:43,359] {logging_mixin.py:104} INFO - [2021-12-23 09:31:43,359] {dag.py:1818} INFO - Sync 1 DAGs
111:[2021-12-23 09:31:43,417] {logging_mixin.py:104} INFO - [2021-12-23 09:31:43,417] {dag.py:2273} INFO - Setting next_dagrun for SYNC-JOB to 2021-12-23T00:30:00+00:00
112:[2021-12-23 09:31:43,428] {scheduler_job.py:190} INFO - Processing /opt/airflow/dags/sync-job/sync_data.py took 0.141 seconds
113:[2021-12-23 09:33:43,671] {scheduler_job.py:182} INFO - Started process (PID=3004) to work on /opt/airflow/dags/sync-job/sync_data.py
114:[2021-12-23 09:33:43,674] {scheduler_job.py:629} INFO - Processing file /opt/airflow/dags/sync-job/sync_data.py for tasks to queue
115:[2021-12-23 09:33:43,675] {logging_mixin.py:104} INFO - [2021-12-23 09:33:43,675] {dagbag.py:448} INFO - Filling up the DagBag from /opt/airflow/dags/sync-job/sync_data.py
116:[2021-12-23 09:33:43,717] {scheduler_job.py:639} INFO - DAG(s) dict_keys(['SYNC-JOB']) retrieved from /opt/airflow/dags/sync-job/sync_data.py
117:[2021-12-23 09:33:43,745] {logging_mixin.py:104} INFO - [2021-12-23 09:33:43,745] {dag.py:1818} INFO - Sync 1 DAGs
118:[2021-12-23 09:33:43,811] {logging_mixin.py:104} INFO - [2021-12-23 09:33:43,811] {dag.py:2273} INFO - Setting next_dagrun for SYNC-JOB to 2021-12-23T00:30:00+00:00
119:[2021-12-23 09:33:43,824] {scheduler_job.py:190} INFO - Processing /opt/airflow/dags/sync-job/sync_data.py took 0.159 seconds
120:[2021-12-23 09:35:44,098] {scheduler_job.py:182} INFO - Started process (PID=3079) to work on /opt/airflow/dags/sync-job/sync_data.py
121:[2021-12-23 09:35:44,100] {scheduler_job.py:629} INFO - Processing file /opt/airflow/dags/sync-job/sync_data.py for tasks to queue
我尝试删除所有正在运行的作业并重新启动气流调度程序和工作程序,但它似乎不起作用。此外,由于版本是 2.0.1,因此问题可能与之前版本中报告的不同。